修改DIST_CIP未更新bug
This commit is contained in:
@@ -44,20 +44,16 @@ public class UpdateGraphData {
|
|||||||
long start = System.currentTimeMillis();
|
long start = System.currentTimeMillis();
|
||||||
try {
|
try {
|
||||||
|
|
||||||
updateDocument(newVertexFqdnMap, historyVertexFqdnMap, "FQDN",
|
updateDocument(newVertexFqdnMap, historyVertexFqdnMap, "FQDN", Fqdn.class,BaseDocument.class,
|
||||||
Fqdn.class,BaseDocument.class,
|
|
||||||
ReadClickhouseData::getVertexFqdnSql,ReadClickhouseData::getVertexFqdnDocument);
|
ReadClickhouseData::getVertexFqdnSql,ReadClickhouseData::getVertexFqdnDocument);
|
||||||
|
|
||||||
updateDocument(newVertexIpMap,historyVertexIpMap,"IP",
|
updateDocument(newVertexIpMap,historyVertexIpMap,"IP", Ip.class,BaseDocument.class,
|
||||||
Ip.class,BaseDocument.class,
|
|
||||||
ReadClickhouseData::getVertexIpSql,ReadClickhouseData::getVertexIpDocument);
|
ReadClickhouseData::getVertexIpSql,ReadClickhouseData::getVertexIpDocument);
|
||||||
|
|
||||||
updateDocument(newVertexSubscriberMap,historyVertexSubscriberMap,"SUBSCRIBER",
|
updateDocument(newVertexSubscriberMap,historyVertexSubscriberMap,"SUBSCRIBER", Subscriber.class,BaseDocument.class,
|
||||||
Subscriber.class,BaseDocument.class,
|
|
||||||
ReadClickhouseData::getVertexSubscriberSql,ReadClickhouseData::getVertexSubscriberDocument);
|
ReadClickhouseData::getVertexSubscriberSql,ReadClickhouseData::getVertexSubscriberDocument);
|
||||||
|
|
||||||
updateDocument(newRelationFqdnAddressIpMap,historyRelationFqdnAddressIpMap,"R_LOCATE_FQDN2IP",
|
updateDocument(newRelationFqdnAddressIpMap,historyRelationFqdnAddressIpMap,"R_LOCATE_FQDN2IP", LocateFqdn2Ip.class,BaseEdgeDocument.class,
|
||||||
LocateFqdn2Ip.class,BaseEdgeDocument.class,
|
|
||||||
ReadClickhouseData::getRelationshipFqdnAddressIpSql,ReadClickhouseData::getRelationFqdnAddressIpDocument);
|
ReadClickhouseData::getRelationshipFqdnAddressIpSql,ReadClickhouseData::getRelationFqdnAddressIpDocument);
|
||||||
|
|
||||||
// updateDocument(newRelationIpVisitFqdnMap,historyRelationIpVisitFqdnMap,"R_VISIT_IP2FQDN",
|
// updateDocument(newRelationIpVisitFqdnMap,historyRelationIpVisitFqdnMap,"R_VISIT_IP2FQDN",
|
||||||
|
|||||||
@@ -264,8 +264,8 @@ public class ReadClickhouseData {
|
|||||||
|
|
||||||
public static String getVertexIpSql() {
|
public static String getVertexIpSql() {
|
||||||
String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime;
|
String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime;
|
||||||
String clientIpSql = "SELECT common_client_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_c2s_byte_num) as BYTES_SUM,groupUniqArray(2)(common_link_info) as common_link_info,'client' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP";
|
String clientIpSql = "SELECT common_client_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_c2s_byte_num+common_s2c_byte_num) as BYTES_SUM,groupUniqArray(2)(common_link_info) as common_link_info,'client' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP";
|
||||||
String serverIpSql = "SELECT common_server_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_s2c_byte_num) as BYTES_SUM,groupUniqArray(2)(common_link_info) as common_link_info,'server' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP";
|
String serverIpSql = "SELECT common_server_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_c2s_byte_num+common_s2c_byte_num) as BYTES_SUM,groupUniqArray(2)(common_link_info) as common_link_info,'server' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP";
|
||||||
return "SELECT * FROM((" + clientIpSql + ") UNION ALL (" + serverIpSql + "))";
|
return "SELECT * FROM((" + clientIpSql + ") UNION ALL (" + serverIpSql + "))";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -50,6 +50,7 @@ public class ReadHistoryArangoData<T extends BaseDocument> extends Thread {
|
|||||||
long s = System.currentTimeMillis();
|
long s = System.currentTimeMillis();
|
||||||
ArangoCursor<T> docs = arangoConnect.executorQuery(query, type);
|
ArangoCursor<T> docs = arangoConnect.executorQuery(query, type);
|
||||||
if (docs != null) {
|
if (docs != null) {
|
||||||
|
ArrayList<T> list = new ArrayList<>();
|
||||||
List<T> baseDocuments = docs.asListRemaining();
|
List<T> baseDocuments = docs.asListRemaining();
|
||||||
int i = 0;
|
int i = 0;
|
||||||
for (T doc : baseDocuments) {
|
for (T doc : baseDocuments) {
|
||||||
@@ -58,9 +59,7 @@ public class ReadHistoryArangoData<T extends BaseDocument> extends Thread {
|
|||||||
case "R_LOCATE_FQDN2IP":
|
case "R_LOCATE_FQDN2IP":
|
||||||
updateProtocolDocument(doc);
|
updateProtocolDocument(doc);
|
||||||
deleteDistinctClientIpByTime(doc);
|
deleteDistinctClientIpByTime(doc);
|
||||||
break;
|
list.add(doc);
|
||||||
case "R_VISIT_IP2FQDN":
|
|
||||||
updateProtocolDocument(doc);
|
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
@@ -69,6 +68,7 @@ public class ReadHistoryArangoData<T extends BaseDocument> extends Thread {
|
|||||||
tmpMap.put(key, doc);
|
tmpMap.put(key, doc);
|
||||||
i++;
|
i++;
|
||||||
}
|
}
|
||||||
|
arangoConnect.overwrite(list,table);
|
||||||
long l = System.currentTimeMillis();
|
long l = System.currentTimeMillis();
|
||||||
LOG.info(query + "\n读取" + i + "条数据,运行时间:" + (l - s));
|
LOG.info(query + "\n读取" + i + "条数据,运行时间:" + (l - s));
|
||||||
}
|
}
|
||||||
@@ -99,10 +99,11 @@ public class ReadHistoryArangoData<T extends BaseDocument> extends Thread {
|
|||||||
ArrayList<Long> distCipTs = (ArrayList<Long>) doc.getAttribute("DIST_CIP_TS");
|
ArrayList<Long> distCipTs = (ArrayList<Long>) doc.getAttribute("DIST_CIP_TS");
|
||||||
distCipTs.add(currentHour - RECENT_COUNT_HOUR * 3600);
|
distCipTs.add(currentHour - RECENT_COUNT_HOUR * 3600);
|
||||||
Collections.sort(distCipTs);
|
Collections.sort(distCipTs);
|
||||||
|
Collections.reverse(distCipTs);
|
||||||
int index = distCipTs.indexOf(currentHour - RECENT_COUNT_HOUR * 3600);
|
int index = distCipTs.indexOf(currentHour - RECENT_COUNT_HOUR * 3600);
|
||||||
String[] distCipArr = new String[index];
|
String[] distCipArr = new String[index];
|
||||||
long[] disCipTsArr = new long[index];
|
long[] disCipTsArr = new long[index];
|
||||||
if (distCip.size() + 1 == distCipTs.size()){
|
if (index != 0 && distCip.size() + 1 == distCipTs.size()){
|
||||||
for (int i = 0; i < index; i++) {
|
for (int i = 0; i < index; i++) {
|
||||||
distCipArr[i] = distCip.get(i);
|
distCipArr[i] = distCip.get(i);
|
||||||
disCipTsArr[i] = distCipTs.get(i);
|
disCipTsArr[i] = distCipTs.get(i);
|
||||||
|
|||||||
@@ -17,7 +17,7 @@ thread.await.termination.time=10
|
|||||||
|
|
||||||
|
|
||||||
#读取clickhouse时间范围方式,0:读取过去一小时,1:指定时间范围
|
#读取clickhouse时间范围方式,0:读取过去一小时,1:指定时间范围
|
||||||
time.limit.type=1
|
time.limit.type=0
|
||||||
read.clickhouse.max.time=1596684142
|
read.clickhouse.max.time=1596684142
|
||||||
read.clickhouse.min.time=1596425769
|
read.clickhouse.min.time=1596425769
|
||||||
|
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ import java.util.List;
|
|||||||
|
|
||||||
public class TestList {
|
public class TestList {
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
|
/*
|
||||||
ArangoDBConnect arangoConnect = ArangoDBConnect.getInstance();
|
ArangoDBConnect arangoConnect = ArangoDBConnect.getInstance();
|
||||||
ArangoCursor<BaseEdgeDocument> documents = arangoConnect.executorQuery("FOR doc IN R_LOCATE_FQDN2IP filter doc.FIRST_FOUND_TIME >= 1596080839 and doc.FIRST_FOUND_TIME <= 1596395473 RETURN doc", BaseEdgeDocument.class);
|
ArangoCursor<BaseEdgeDocument> documents = arangoConnect.executorQuery("FOR doc IN R_LOCATE_FQDN2IP filter doc.FIRST_FOUND_TIME >= 1596080839 and doc.FIRST_FOUND_TIME <= 1596395473 RETURN doc", BaseEdgeDocument.class);
|
||||||
List<BaseEdgeDocument> baseEdgeDocuments = documents.asListRemaining();
|
List<BaseEdgeDocument> baseEdgeDocuments = documents.asListRemaining();
|
||||||
@@ -18,8 +19,8 @@ public class TestList {
|
|||||||
doc.updateAttribute("PROTOCOL_TYPE","123");
|
doc.updateAttribute("PROTOCOL_TYPE","123");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
*/
|
||||||
|
|
||||||
/*
|
|
||||||
ArrayList<Integer> integers = new ArrayList<>();
|
ArrayList<Integer> integers = new ArrayList<>();
|
||||||
integers.add(10);
|
integers.add(10);
|
||||||
integers.add(8);
|
integers.add(8);
|
||||||
@@ -39,7 +40,9 @@ public class TestList {
|
|||||||
integers.add(5);
|
integers.add(5);
|
||||||
Collections.sort(integers);
|
Collections.sort(integers);
|
||||||
System.out.println(integers);
|
System.out.println(integers);
|
||||||
|
Collections.reverse(integers);
|
||||||
|
System.out.println(integers);
|
||||||
System.out.println(integers.indexOf(5));
|
System.out.println(integers.indexOf(5));
|
||||||
*/
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user