修改处理逻辑,按照文档分别处理。
This commit is contained in:
@@ -29,7 +29,7 @@ public class UpdateEFqdnAddressIp implements Runnable {
|
||||
BaseEdgeDocument newEdgeDocument = documentHashMap.getOrDefault(key, null);
|
||||
if (newEdgeDocument != null){
|
||||
i += 1;
|
||||
BaseEdgeDocument edgeDocument = BaseArangoData.e_Fqdn_Address_Ip_Map.getOrDefault(key, null);
|
||||
BaseEdgeDocument edgeDocument = BaseArangoData.historyRelationFqdnAddressIpMap.getOrDefault(key, null);
|
||||
if (edgeDocument != null){
|
||||
Object lastFoundTime = newEdgeDocument.getAttribute("LAST_FOUND_TIME");
|
||||
long countTotal = Long.parseLong(newEdgeDocument.getAttribute("COUNT_TOTAL").toString());
|
||||
|
||||
@@ -29,7 +29,7 @@ public class UpdateEIpVisitFqdn implements Runnable {
|
||||
BaseEdgeDocument newEdgeDocument = documentHashMap.getOrDefault(key, null);
|
||||
if (newEdgeDocument != null){
|
||||
i += 1;
|
||||
BaseEdgeDocument edgeDocument = BaseArangoData.e_Ip_Visit_Fqdn_Map.getOrDefault(key, null);
|
||||
BaseEdgeDocument edgeDocument = BaseArangoData.historyRelationIpVisitFqdnMap.getOrDefault(key, null);
|
||||
if (edgeDocument != null){
|
||||
Object lastFoundTime = newEdgeDocument.getAttribute("LAST_FOUND_TIME");
|
||||
long countTotal = Long.parseLong(newEdgeDocument.getAttribute("COUNT_TOTAL").toString());
|
||||
|
||||
@@ -33,7 +33,7 @@ public class UpdateVFqdn implements Runnable{
|
||||
|
||||
if (newDocument != null){
|
||||
i += 1;
|
||||
BaseDocument document = BaseArangoData.v_Fqdn_Map.getOrDefault(key, null);
|
||||
BaseDocument document = BaseArangoData.historyVertexFqdnMap.getOrDefault(key, null);
|
||||
if (document != null){
|
||||
Object lastFoundTime = newDocument.getAttribute("LAST_FOUND_TIME");
|
||||
long fqdnCountTotal = Long.parseLong(newDocument.getAttribute("FQDN_COUNT_TOTAL").toString());
|
||||
|
||||
@@ -31,7 +31,7 @@ public class UpdateVIP implements Runnable {
|
||||
BaseDocument newDocument = documentHashMap.getOrDefault(key, null);
|
||||
if (newDocument != null){
|
||||
i += 1;
|
||||
BaseDocument document = BaseArangoData.v_Ip_Map.getOrDefault(key, null);
|
||||
BaseDocument document = BaseArangoData.historyVertexIpMap.getOrDefault(key, null);
|
||||
if (document != null){
|
||||
Object lastFoundTime = newDocument.getAttribute("LAST_FOUND_TIME");
|
||||
long ipCountTotal = Long.parseLong(newDocument.getAttribute("IP_COUNT_TOTAL").toString());
|
||||
|
||||
@@ -9,6 +9,7 @@ import org.slf4j.LoggerFactory;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
/**
|
||||
* @author wlh
|
||||
@@ -22,38 +23,46 @@ public class ReadHistoryArangoData<T extends BaseDocument> extends Thread {
|
||||
private ConcurrentHashMap<String, T> map;
|
||||
private Class<T> type;
|
||||
private String table;
|
||||
private CountDownLatch countDownLatch;
|
||||
|
||||
public ReadHistoryArangoData(ArangoDBConnect arangoConnect, String query, ConcurrentHashMap<String, T> map, Class<T> type, String table) {
|
||||
public ReadHistoryArangoData(ArangoDBConnect arangoConnect, String query, ConcurrentHashMap<String, T> map, Class<T> type, String table,CountDownLatch countDownLatch) {
|
||||
this.arangoConnect = arangoConnect;
|
||||
this.query = query;
|
||||
this.map = map;
|
||||
this.type = type;
|
||||
this.table = table;
|
||||
this.countDownLatch = countDownLatch;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
long s = System.currentTimeMillis();
|
||||
ArangoCursor<T> docs = arangoConnect.executorQuery(query, type);
|
||||
if (docs != null) {
|
||||
List<T> baseDocuments = docs.asListRemaining();
|
||||
int i = 0;
|
||||
for (T doc : baseDocuments) {
|
||||
String key = doc.getKey();
|
||||
switch (table) {
|
||||
case "R_LOCATE_FQDN2IP":
|
||||
updateProtocolDocument(doc);
|
||||
break;
|
||||
case "R_VISIT_IP2FQDN":
|
||||
updateProtocolDocument(doc);
|
||||
break;
|
||||
default:
|
||||
try {
|
||||
long s = System.currentTimeMillis();
|
||||
ArangoCursor<T> docs = arangoConnect.executorQuery(query, type);
|
||||
if (docs != null) {
|
||||
List<T> baseDocuments = docs.asListRemaining();
|
||||
int i = 0;
|
||||
for (T doc : baseDocuments) {
|
||||
String key = doc.getKey();
|
||||
switch (table) {
|
||||
case "R_LOCATE_FQDN2IP":
|
||||
updateProtocolDocument(doc);
|
||||
break;
|
||||
case "R_VISIT_IP2FQDN":
|
||||
updateProtocolDocument(doc);
|
||||
break;
|
||||
default:
|
||||
}
|
||||
map.put(key, doc);
|
||||
i++;
|
||||
}
|
||||
map.put(key, doc);
|
||||
i++;
|
||||
long l = System.currentTimeMillis();
|
||||
LOG.info(query + "\n读取数据" + i + "条,运行时间:" + (l - s));
|
||||
}
|
||||
long l = System.currentTimeMillis();
|
||||
LOG.info(query + "\n处理数据" + i + "条,运行时间:" + (l - s));
|
||||
}catch (Exception e){
|
||||
e.printStackTrace();
|
||||
}finally {
|
||||
countDownLatch.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -39,6 +39,7 @@ public class Document<T extends BaseDocument> extends Thread{
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
LOG.info(collectionName+" new Map 大小:"+newDocumentMap.size());
|
||||
Set<String> keySet = newDocumentMap.keySet();
|
||||
ArrayList<T> resultDocumentList = new ArrayList<>();
|
||||
int i = 0;
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
package cn.ac.iie.service.relationship;
|
||||
package cn.ac.iie.service.update.relationship;
|
||||
|
||||
import cn.ac.iie.service.read.ReadClickhouseData;
|
||||
import cn.ac.iie.service.update.Relationship;
|
||||
@@ -1,4 +1,4 @@
|
||||
package cn.ac.iie.service.relationship;
|
||||
package cn.ac.iie.service.update.relationship;
|
||||
|
||||
import cn.ac.iie.service.update.Relationship;
|
||||
import cn.ac.iie.utils.ArangoDBConnect;
|
||||
@@ -1,4 +1,4 @@
|
||||
package cn.ac.iie.service.relationship;
|
||||
package cn.ac.iie.service.update.relationship;
|
||||
|
||||
import cn.ac.iie.service.update.Relationship;
|
||||
import cn.ac.iie.utils.ArangoDBConnect;
|
||||
@@ -1,21 +0,0 @@
|
||||
package cn.ac.iie.service.vertex;
|
||||
|
||||
import cn.ac.iie.service.update.Vertex;
|
||||
import cn.ac.iie.utils.ArangoDBConnect;
|
||||
import com.arangodb.entity.BaseDocument;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
public class Fqdn extends Vertex {
|
||||
|
||||
public Fqdn(HashMap<String, ArrayList<BaseDocument>> newDocumentHashMap,
|
||||
ArangoDBConnect arangoManger,
|
||||
String collectionName,
|
||||
ConcurrentHashMap<String, BaseDocument> historyDocumentMap,
|
||||
CountDownLatch countDownLatch) {
|
||||
super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap,countDownLatch);
|
||||
}
|
||||
}
|
||||
@@ -1,79 +0,0 @@
|
||||
package cn.ac.iie.service.vertex;
|
||||
|
||||
import cn.ac.iie.service.update.Vertex;
|
||||
import cn.ac.iie.utils.ArangoDBConnect;
|
||||
import com.arangodb.entity.BaseDocument;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
public class Ip extends Vertex {
|
||||
|
||||
public Ip(HashMap<String, ArrayList<BaseDocument>> newDocumentHashMap,
|
||||
ArangoDBConnect arangoManger,
|
||||
String collectionName,
|
||||
ConcurrentHashMap<String, BaseDocument> historyDocumentMap,
|
||||
CountDownLatch countDownLatch) {
|
||||
super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap,countDownLatch);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void updateFunction(BaseDocument newDocument, BaseDocument historyDocument) {
|
||||
super.updateFunction(newDocument, historyDocument);
|
||||
updateIpByType(newDocument, historyDocument);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void mergeFunction(Map<String, Object> properties, BaseDocument doc) {
|
||||
super.mergeFunction(properties, doc);
|
||||
mergeIpByType(properties,doc);
|
||||
}
|
||||
|
||||
private void mergeIpByType(Map<String, Object> properties, BaseDocument doc){
|
||||
Map<String, Object> mergeProperties = doc.getProperties();
|
||||
checkIpTypeProperty(properties,mergeProperties,"CLIENT_SESSION_COUNT");
|
||||
checkIpTypeProperty(properties,mergeProperties,"CLIENT_BYTES_SUM");
|
||||
checkIpTypeProperty(properties,mergeProperties,"SERVER_SESSION_COUNT");
|
||||
checkIpTypeProperty(properties,mergeProperties,"SERVER_BYTES_SUM");
|
||||
}
|
||||
|
||||
private void checkIpTypeProperty(Map<String, Object> properties,Map<String, Object> mergeProperties,String property){
|
||||
try {
|
||||
if (!properties.containsKey(property)){
|
||||
properties.put(property,0L);
|
||||
checkIpTypeProperty(properties,mergeProperties,property);
|
||||
}else if ("0".equals(properties.get(property).toString()) && mergeProperties.containsKey(property)){
|
||||
if (!"0".equals(mergeProperties.get(property).toString())){
|
||||
properties.put(property,Long.parseLong(mergeProperties.get(property).toString()));
|
||||
}
|
||||
}
|
||||
}catch (Exception e){
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
private void updateIpByType(BaseDocument newDocument, BaseDocument historyDocument){
|
||||
addProperty(newDocument,historyDocument,"CLIENT_SESSION_COUNT");
|
||||
addProperty(newDocument,historyDocument,"CLIENT_BYTES_SUM");
|
||||
addProperty(newDocument,historyDocument,"SERVER_SESSION_COUNT");
|
||||
addProperty(newDocument,historyDocument,"SERVER_BYTES_SUM");
|
||||
}
|
||||
|
||||
private void addProperty(BaseDocument newDocument, BaseDocument historyDocument,String property){
|
||||
try {
|
||||
if (historyDocument.getProperties().containsKey(property)){
|
||||
long newProperty = Long.parseLong(newDocument.getAttribute(property).toString());
|
||||
long hisProperty = Long.parseLong(historyDocument.getAttribute(property).toString());
|
||||
historyDocument.updateAttribute(property,newProperty+hisProperty);
|
||||
}else {
|
||||
historyDocument.addAttribute(property,0L);
|
||||
}
|
||||
}catch (Exception e){
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,21 +0,0 @@
|
||||
package cn.ac.iie.service.vertex;
|
||||
|
||||
import cn.ac.iie.service.update.Vertex;
|
||||
import cn.ac.iie.utils.ArangoDBConnect;
|
||||
import com.arangodb.entity.BaseDocument;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
public class Subscriber extends Vertex {
|
||||
|
||||
public Subscriber(HashMap<String, ArrayList<BaseDocument>> newDocumentHashMap,
|
||||
ArangoDBConnect arangoManger,
|
||||
String collectionName,
|
||||
ConcurrentHashMap<String, BaseDocument> historyDocumentMap,
|
||||
CountDownLatch countDownLatch) {
|
||||
super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap, countDownLatch);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user