修改ip vertex属性

This commit is contained in:
wanglihui
2020-07-10 19:03:48 +08:00
parent d214e86a41
commit 705ed03926
6 changed files with 93 additions and 37 deletions

View File

@@ -129,7 +129,7 @@ public class BaseClickhouseData {
while (resultSet.next()){
BaseEdgeDocument newDoc = getRelationshipSubsciberLocateIpDocument(resultSet);
String key = newDoc.getKey();
putMapByHashcode(resultSet, newDoc, eSubsciberLocateIpMap,key);
putMapByHashcode(newDoc, eSubsciberLocateIpMap,key);
}
long last = System.currentTimeMillis();
LOG.info(sql + "\n读取clickhouse ESubsciberLocateIp时间" + (last - start));
@@ -154,7 +154,7 @@ public class BaseClickhouseData {
while (resultSet.next()) {
BaseEdgeDocument newDoc = getRelationFqdnAddressIpDocument(resultSet);
String commonSchemaType = resultSet.getString("common_schema_type");
putMapByHashcode(resultSet, newDoc, eFqdnAddressIpMap,commonSchemaType);
putMapByHashcode(newDoc, eFqdnAddressIpMap,commonSchemaType);
}
long last = System.currentTimeMillis();
LOG.info(sql + "\n读取clickhouse EFqdnAddressIp时间" + (last - start));
@@ -177,7 +177,7 @@ public class BaseClickhouseData {
while (resultSet.next()) {
BaseEdgeDocument newDoc = getRelationIpVisitFqdnDocument(resultSet);
String commonSchemaType = resultSet.getString("common_schema_type");
putMapByHashcode(resultSet, newDoc, eIpVisitFqdnMap,commonSchemaType);
putMapByHashcode(newDoc, eIpVisitFqdnMap,commonSchemaType);
}
long last = System.currentTimeMillis();
LOG.info(sql + "\n读取clickhouse EIpVisitFqdn时间" + (last - start));

View File

@@ -32,27 +32,31 @@ public class ReadClickhouseData {
}
public static BaseDocument getVertexIpDocument(ResultSet resultSet) throws SQLException {
BaseDocument newDoc = new BaseDocument();
String ip = resultSet.getString("IP");
String location = resultSet.getString("location");
String[] locationSplit = location.split(";");
String ipLocationNation;
String ipLocationRegion;
if (locationSplit.length == 3) {
ipLocationNation = locationSplit[0];
ipLocationRegion = locationSplit[1];
} else {
ipLocationNation = location;
ipLocationRegion = location;
}
long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME");
long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME");
BaseDocument newDoc = new BaseDocument();
long sessionCount = resultSet.getLong("SESSION_COUNT");
long bytesSum = resultSet.getLong("BYTES_SUM");
String ipType = resultSet.getString("ip_type");
newDoc.setKey(ip);
newDoc.addAttribute("IP", ip);
newDoc.addAttribute("IP_LOCATION_NATION", ipLocationNation);
newDoc.addAttribute("IP_LOCATION_REGION", ipLocationRegion);
newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime);
newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime);
switch (ipType) {
case "client":
newDoc.addAttribute("CLIENT_SESSION_COUNT", sessionCount);
newDoc.addAttribute("CLIENT_BYTES_SUM", bytesSum);
newDoc.addAttribute("SERVER_SESSION_COUNT",0L);
newDoc.addAttribute("SERVER_BYTES_SUM",0L);
break;
case "server":
newDoc.addAttribute("SERVER_SESSION_COUNT", sessionCount);
newDoc.addAttribute("SERVER_BYTES_SUM", bytesSum);
newDoc.addAttribute("CLIENT_SESSION_COUNT",0L);
newDoc.addAttribute("CLIENT_BYTES_SUM",0L);
break;
}
return newDoc;
}
@@ -133,8 +137,8 @@ public class ReadClickhouseData {
return newDoc;
}
public static void putMapByHashcode(ResultSet resultSet, BaseEdgeDocument newDoc, HashMap<Integer, HashMap<String, HashMap<String, BaseEdgeDocument>>> map,String schema) throws SQLException {
if (newDoc != null){
public static void putMapByHashcode(BaseEdgeDocument newDoc, HashMap<Integer, HashMap<String, HashMap<String, BaseEdgeDocument>>> map, String schema) throws SQLException {
if (newDoc != null) {
String key = newDoc.getKey();
int i = Math.abs(key.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER;
HashMap<String, HashMap<String, BaseEdgeDocument>> documentHashMap = map.getOrDefault(i, new HashMap());
@@ -145,7 +149,7 @@ public class ReadClickhouseData {
}
}
public static boolean isDomain(String fqdn) {
private static boolean isDomain(String fqdn) {
try {
String[] fqdnArr = fqdn.split("\\.");
if (fqdnArr.length < 4 || fqdnArr.length > 4) {
@@ -182,10 +186,10 @@ public class ReadClickhouseData {
long[] timeLimit = getTimeLimit();
long maxTime = timeLimit[0];
long minTime = timeLimit[1];
String where = " common_recv_time >= " + minTime + " AND common_recv_time <= " + maxTime + " AND (common_schema_type = 'HTTP' or common_schema_type = 'SSL')";
String clientIpSql = "SELECT common_client_ip AS IP, common_client_location AS location, common_recv_time FROM tsg_galaxy_v3.connection_record_log where " + where;
String serverIpSql = "SELECT common_server_ip AS IP, common_server_location AS location, common_recv_time FROM tsg_galaxy_v3.connection_record_log where " + where;
return "SELECT IP,location,MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,COUNT(*) AS IP_COUNT_TOTAL FROM((" + clientIpSql + ") UNION ALL (" + serverIpSql + ")) GROUP BY IP,location";
String where = " common_recv_time >= " + minTime + " AND common_recv_time <= " + maxTime;
String clientIpSql = "SELECT common_client_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_c2s_byte_num) as BYTES_SUM,'client' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP";
String serverIpSql = "SELECT common_server_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_s2c_byte_num) as BYTES_SUM,'server' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP";
return "SELECT * FROM((" + clientIpSql + ") UNION ALL (" + serverIpSql + "))";
}
public static String getEFqdnAddressIpSql() {
@@ -208,20 +212,20 @@ public class ReadClickhouseData {
return "SELECT * FROM ((" + sslSql + ") UNION ALL (" + httpSql + "))WHERE FQDN != ''";
}
public static String getVertexSubscriberSql(){
public static String getVertexSubscriberSql() {
long[] timeLimit = getTimeLimit();
long maxTime = timeLimit[0];
long minTime = timeLimit[1];
String where = " common_recv_time >= "+minTime+" AND common_recv_time <= "+maxTime+" AND common_subscriber_id != '' AND radius_packet_type = 4 AND radius_acct_status_type = 1";
return "SELECT common_subscriber_id,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME FROM radius_record_log WHERE"+where+" GROUP BY common_subscriber_id";
String where = " common_recv_time >= " + minTime + " AND common_recv_time <= " + maxTime + " AND common_subscriber_id != '' AND radius_packet_type = 4 AND radius_acct_status_type = 1";
return "SELECT common_subscriber_id,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME FROM radius_record_log WHERE" + where + " GROUP BY common_subscriber_id";
}
public static String getRelationshipSubsciberLocateIpSql(){
public static String getRelationshipSubsciberLocateIpSql() {
long[] timeLimit = getTimeLimit();
long maxTime = timeLimit[0];
long minTime = timeLimit[1];
String where = " common_recv_time >= "+minTime+" AND common_recv_time <= "+maxTime+" AND common_subscriber_id != '' AND radius_framed_ip != '' AND radius_packet_type = 4 AND radius_acct_status_type = 1";
return "SELECT common_subscriber_id,radius_framed_ip,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME,COUNT(*) as COUNT_TOTAL FROM radius_record_log WHERE"+where+" GROUP BY common_subscriber_id,radius_framed_ip";
String where = " common_recv_time >= " + minTime + " AND common_recv_time <= " + maxTime + " AND common_subscriber_id != '' AND radius_framed_ip != '' AND radius_packet_type = 4 AND radius_acct_status_type = 1";
return "SELECT common_subscriber_id,radius_framed_ip,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME,COUNT(*) as COUNT_TOTAL FROM radius_record_log WHERE" + where + " GROUP BY common_subscriber_id,radius_framed_ip";
}
private static long[] getTimeLimit() {

View File

@@ -3,12 +3,15 @@ package cn.ac.iie.service.update;
import cn.ac.iie.config.ApplicationConfig;
import cn.ac.iie.utils.ArangoDBConnect;
import com.arangodb.entity.BaseEdgeDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
public class Relationship extends Thread {
private static final Logger LOG = LoggerFactory.getLogger(Relationship.class);
protected HashMap<String, HashMap<String, BaseEdgeDocument>> newDocumentHashMap;
protected ArangoDBConnect arangoManger;
@@ -43,18 +46,18 @@ public class Relationship extends Thread {
updateRelationship(newEdgeDocument,historyEdgeDocument,docInsert);
if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) {
arangoManger.overwrite(docInsert, collectionName);
System.out.println("更新"+collectionName+":" + i);
LOG.info("更新"+collectionName+":" + i);
i = 0;
}
}
}
if (i != 0) {
arangoManger.overwrite(docInsert, collectionName);
System.out.println("更新"+collectionName+":" + i);
LOG.info("更新"+collectionName+":" + i);
}
} catch (Exception e) {
e.printStackTrace();
System.out.println(e.toString());
LOG.error(e.toString());
}finally {
countDownLatch.countDown();
}

View File

@@ -4,6 +4,8 @@ import cn.ac.iie.config.ApplicationConfig;
import cn.ac.iie.utils.ArangoDBConnect;
import com.arangodb.entity.BaseDocument;
import com.arangodb.entity.BaseEdgeDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
@@ -17,6 +19,7 @@ import java.util.concurrent.CountDownLatch;
* 多线程更新vertex数据
*/
public class Vertex extends Thread{
private static final Logger LOG = LoggerFactory.getLogger(Vertex.class);
protected HashMap<String, ArrayList<BaseDocument>> newDocumentHashMap;
protected ArangoDBConnect arangoManger;
@@ -52,16 +55,17 @@ public class Vertex extends Thread{
}
if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){
arangoManger.overwrite(docInsert,collectionName);
System.out.println("更新"+i);
LOG.info("更新"+collectionName+":"+i);
i = 0;
}
}
if (i != 0){
arangoManger.overwrite(docInsert,collectionName);
System.out.println("更新"+i);
LOG.info("更新"+collectionName+":"+i);
}
}catch (Exception e){
e.printStackTrace();
LOG.error(e.toString());
}finally {
countDownLatch.countDown();
}

View File

@@ -24,12 +24,57 @@ public class Ip extends Vertex {
@Override
protected void mergeFunction(Map<String, Object> properties, BaseDocument doc) {
super.mergeFunction(properties, doc);
mergeIpByType(properties,doc);
}
private void mergeIpByType(Map<String, Object> properties, BaseDocument doc){
Map<String, Object> mergeProperties = doc.getProperties();
checkIpTypeProperty(properties,mergeProperties,"CLIENT_SESSION_COUNT");
checkIpTypeProperty(properties,mergeProperties,"CLIENT_BYTES_SUM");
checkIpTypeProperty(properties,mergeProperties,"SERVER_SESSION_COUNT");
checkIpTypeProperty(properties,mergeProperties,"SERVER_BYTES_SUM");
}
private void checkIpTypeProperty(Map<String, Object> properties,Map<String, Object> mergeProperties,String property){
try {
if (!properties.containsKey(property)){
properties.put(property,0L);
checkIpTypeProperty(properties,mergeProperties,property);
}else if (properties.get(property).toString().equals("0") && mergeProperties.containsKey(property)){
if (!mergeProperties.get(property).toString().equals("0")){
properties.put(property,Long.parseLong(mergeProperties.get(property).toString()));
}
}
}catch (Exception e){
e.printStackTrace();
}
}
@Override
protected void updateFunction(BaseDocument newDocument, BaseDocument historyDocument) {
super.updateFunction(newDocument, historyDocument);
updateIpByType(newDocument, historyDocument);
}
private void updateIpByType(BaseDocument newDocument, BaseDocument historyDocument){
addProperty(newDocument,historyDocument,"CLIENT_SESSION_COUNT");
addProperty(newDocument,historyDocument,"CLIENT_BYTES_SUM");
addProperty(newDocument,historyDocument,"SERVER_SESSION_COUNT");
addProperty(newDocument,historyDocument,"SERVER_BYTES_SUM");
}
private void addProperty(BaseDocument newDocument, BaseDocument historyDocument,String property){
try {
if (historyDocument.getProperties().containsKey(property)){
long newProperty = Long.parseLong(newDocument.getAttribute(property).toString());
long hisProperty = Long.parseLong(historyDocument.getAttribute(property).toString());
historyDocument.updateAttribute(property,newProperty+hisProperty);
}else {
historyDocument.addAttribute(property,0L);
}
}catch (Exception e){
e.printStackTrace();
}
}
}

View File

@@ -13,5 +13,5 @@ update.arango.batch=10000
thread.pool.number=10
thread.await.termination.time=10
read.clickhouse.max.time=1594194404
read.clickhouse.max.time=1594376834
read.clickhouse.min.time=1593676953