first commit
This commit is contained in:
9
IP-learning-graph/.gitignore
vendored
Normal file
9
IP-learning-graph/.gitignore
vendored
Normal file
@@ -0,0 +1,9 @@
|
||||
# Created by .ignore support plugin (hsz.mobi)
|
||||
### Example user template template
|
||||
### Example user template
|
||||
|
||||
# IntelliJ project files
|
||||
.idea
|
||||
*.iml
|
||||
target
|
||||
logs/
|
||||
87
IP-learning-graph/pom.xml
Normal file
87
IP-learning-graph/pom.xml
Normal file
@@ -0,0 +1,87 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>cn.ac.iie</groupId>
|
||||
<artifactId>IP-learning-graph</artifactId>
|
||||
<version>1.0-SNAPSHOT</version>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
<version>1.7.21</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
<version>1.7.21</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>ru.yandex.clickhouse</groupId>
|
||||
<artifactId>clickhouse-jdbc</artifactId>
|
||||
<version>0.2.4</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.alibaba</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>1.1.10</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.typesafe</groupId>
|
||||
<artifactId>config</artifactId>
|
||||
<version>1.2.1</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.arangodb</groupId>
|
||||
<artifactId>arangodb-java-driver</artifactId>
|
||||
<version>5.0.4</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-assembly-plugin</artifactId>
|
||||
<version>2.6</version>
|
||||
<configuration>
|
||||
<archive>
|
||||
<manifest>
|
||||
<mainClass>cn.ac.iie.test.IpLearningApplicationTest</mainClass>
|
||||
</manifest>
|
||||
</archive>
|
||||
<descriptorRefs>
|
||||
<descriptorRef>jar-with-dependencies</descriptorRef>
|
||||
</descriptorRefs>
|
||||
</configuration>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>make-assembly</id>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>single</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<configuration>
|
||||
<source>8</source>
|
||||
<target>8</target>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
</project>
|
||||
@@ -0,0 +1,25 @@
|
||||
package cn.ac.iie.config;
|
||||
|
||||
|
||||
import cn.ac.iie.utils.ConfigUtils;
|
||||
|
||||
public class ApplicationConfig {
|
||||
|
||||
public static final String ARANGODB_HOST = ConfigUtils.getStringProperty( "arangoDB.host");
|
||||
public static final Integer ARANGODB_PORT = ConfigUtils.getIntProperty("arangoDB.port");
|
||||
public static final String ARANGODB_USER = ConfigUtils.getStringProperty( "arangoDB.user");
|
||||
public static final String ARANGODB_PASSWORD = ConfigUtils.getStringProperty( "arangoDB.password");
|
||||
public static final String ARANGODB_DB_NAME = ConfigUtils.getStringProperty( "arangoDB.DB.name");
|
||||
public static final Integer ARANGODB_TTL = ConfigUtils.getIntProperty( "arangoDB.ttl");
|
||||
public static final Integer ARANGODB_BATCH = ConfigUtils.getIntProperty( "arangoDB.batch");
|
||||
|
||||
public static final Integer UPDATE_ARANGO_BATCH = ConfigUtils.getIntProperty("update.arango.batch");
|
||||
|
||||
public static final Integer THREAD_POOL_NUMBER = ConfigUtils.getIntProperty( "thread.pool.number");
|
||||
public static final Integer THREAD_AWAIT_TERMINATION_TIME = ConfigUtils.getIntProperty( "thread.await.termination.time");
|
||||
|
||||
public static final Long READ_CLICKHOUSE_MAX_TIME = ConfigUtils.getLongProperty("read.clickhouse.max.time");
|
||||
public static final Long READ_CLICKHOUSE_MIN_TIME = ConfigUtils.getLongProperty("read.clickhouse.min.time");
|
||||
|
||||
|
||||
}
|
||||
@@ -0,0 +1,92 @@
|
||||
package cn.ac.iie.dao;
|
||||
|
||||
import cn.ac.iie.config.ApplicationConfig;
|
||||
import cn.ac.iie.etl.ArangoEFqdnAddressIpToMap;
|
||||
import cn.ac.iie.etl.ArangoEIpVisitFqdnToMap;
|
||||
import cn.ac.iie.etl.ArangoVFqdnToMap;
|
||||
import cn.ac.iie.etl.ArangoVIpToMap;
|
||||
import cn.ac.iie.utils.ArangoDBConnect;
|
||||
import cn.ac.iie.utils.ExecutorThreadPool;
|
||||
import com.arangodb.ArangoCursor;
|
||||
import com.arangodb.entity.BaseDocument;
|
||||
import com.arangodb.entity.BaseEdgeDocument;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
public class BaseArangoData {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(BaseArangoData.class);
|
||||
|
||||
public static ConcurrentHashMap<String, BaseDocument> v_Fqdn_Map = new ConcurrentHashMap<>();
|
||||
public static ConcurrentHashMap<String, BaseDocument> v_Ip_Map = new ConcurrentHashMap<>();
|
||||
public static ConcurrentHashMap<String, BaseEdgeDocument> e_Fqdn_Address_Ip_Map = new ConcurrentHashMap<>();
|
||||
public static ConcurrentHashMap<String, BaseEdgeDocument> e_Ip_Visit_Fqdn_Map = new ConcurrentHashMap<>();
|
||||
|
||||
private static final ArangoDBConnect arangoDBConnect = ArangoDBConnect.getInstance();
|
||||
|
||||
private static final ExecutorThreadPool threadPool = ExecutorThreadPool.getInstance();
|
||||
|
||||
public static void BaseVFqdnDataMap() {
|
||||
String sql = "LET FQDN = (FOR doc IN FQDN RETURN doc) return {max_time:MAX(FQDN[*].FIRST_FOUND_TIME),min_time:MIN(FQDN[*].FIRST_FOUND_TIME)}";
|
||||
long[] timeLimit = getTimeLimit(sql);
|
||||
for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) {
|
||||
ArangoVFqdnToMap ArangoVFqdnToMap = new ArangoVFqdnToMap(arangoDBConnect, timeLimit[0], timeLimit[2],i);
|
||||
threadPool.executor(ArangoVFqdnToMap);
|
||||
}
|
||||
}
|
||||
|
||||
public static void BaseVIpDataMap() {
|
||||
String sql = "LET IP = (FOR doc IN IP RETURN doc) return {max_time:MAX(IP[*].FIRST_FOUND_TIME),min_time:MIN(IP[*].FIRST_FOUND_TIME)}";
|
||||
long[] timeLimit = getTimeLimit(sql);
|
||||
for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) {
|
||||
ArangoVIpToMap arangoVIpToMap = new ArangoVIpToMap(arangoDBConnect, timeLimit[0], timeLimit[2],i);
|
||||
threadPool.executor(arangoVIpToMap);
|
||||
}
|
||||
}
|
||||
|
||||
public static void BaseEFqdnAddressIpDataMap(){
|
||||
String sql = "LET e = (FOR doc IN R_LOCATE_FQDN2IP RETURN doc) return {max_time:MAX(e[*].FIRST_FOUND_TIME),min_time:MIN(e[*].FIRST_FOUND_TIME)}";
|
||||
long[] timeLimit = getTimeLimit(sql);
|
||||
for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++){
|
||||
ArangoEFqdnAddressIpToMap arangoEFqdnAddressIpToMap = new ArangoEFqdnAddressIpToMap(arangoDBConnect, timeLimit[0], timeLimit[2], i);
|
||||
threadPool.executor(arangoEFqdnAddressIpToMap);
|
||||
}
|
||||
}
|
||||
|
||||
public static void BaseEIpVisitFqdnDataMap(){
|
||||
String sql = "LET e = (FOR doc IN R_VISIT_IP2FQDN RETURN doc) return {max_time:MAX(e[*].FIRST_FOUND_TIME),min_time:MIN(e[*].FIRST_FOUND_TIME)}";
|
||||
long[] timeLimit = getTimeLimit(sql);
|
||||
for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++){
|
||||
ArangoEIpVisitFqdnToMap arangoEIpVisitFqdnToMap = new ArangoEIpVisitFqdnToMap(arangoDBConnect, timeLimit[0], timeLimit[2], i);
|
||||
threadPool.executor(arangoEIpVisitFqdnToMap);
|
||||
}
|
||||
}
|
||||
|
||||
private static long[] getTimeLimit(String sql) {
|
||||
long minTime = 0L;
|
||||
long maxTime = 0L;
|
||||
long diffTime = 0L;
|
||||
long startTime = System.currentTimeMillis();
|
||||
LOG.info(sql);
|
||||
ArangoCursor<BaseDocument> timeDoc = arangoDBConnect.executorQuery(sql, BaseDocument.class);
|
||||
try {
|
||||
if (timeDoc != null){
|
||||
while (timeDoc.hasNext()) {
|
||||
BaseDocument doc = timeDoc.next();
|
||||
maxTime = Long.parseLong(doc.getAttribute("max_time").toString()) + ApplicationConfig.THREAD_POOL_NUMBER;
|
||||
minTime = Long.parseLong(doc.getAttribute("min_time").toString());
|
||||
}
|
||||
long lastTime = System.currentTimeMillis();
|
||||
LOG.info("查询最大最小时间用时:" + (lastTime - startTime));
|
||||
diffTime = (maxTime - minTime) / ApplicationConfig.THREAD_POOL_NUMBER;
|
||||
}else {
|
||||
LOG.warn("获取ArangoDb时间范围为空");
|
||||
}
|
||||
}catch (Exception e){
|
||||
LOG.error(e.toString());
|
||||
}
|
||||
return new long[]{minTime, maxTime, diffTime};
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,56 @@
|
||||
package cn.ac.iie.etl;
|
||||
|
||||
import cn.ac.iie.dao.BaseArangoData;
|
||||
import cn.ac.iie.utils.ArangoDBConnect;
|
||||
import com.arangodb.ArangoCursor;
|
||||
import com.arangodb.entity.BaseEdgeDocument;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class ArangoEFqdnAddressIpToMap implements Runnable{
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ArangoEFqdnAddressIpToMap.class);
|
||||
|
||||
private ArangoDBConnect arangoDBConnect;
|
||||
private long finalMinTime;
|
||||
private long diffTime;
|
||||
private int threadNumber;
|
||||
|
||||
private ArangoEFqdnAddressIpToMap(){}
|
||||
|
||||
public ArangoEFqdnAddressIpToMap(ArangoDBConnect arangoDBConnect, long finalMinTime, long diffTime, int threadNumber) {
|
||||
this.arangoDBConnect = arangoDBConnect;
|
||||
this.finalMinTime = finalMinTime;
|
||||
this.diffTime = diffTime;
|
||||
this.threadNumber = threadNumber;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
String name = Thread.currentThread().getName();
|
||||
long maxThreadTime = finalMinTime + (threadNumber + 1) * diffTime;
|
||||
long minThreadTime = finalMinTime + threadNumber * diffTime;
|
||||
String query = "FOR doc IN R_LOCATE_FQDN2IP filter doc.FIRST_FOUND_TIME >= " + minThreadTime + " and doc.FIRST_FOUND_TIME <= " + maxThreadTime + " RETURN doc";
|
||||
LOG.info(name + ":" + query);
|
||||
long s = System.currentTimeMillis();
|
||||
try {
|
||||
ArangoCursor<BaseEdgeDocument> docs = arangoDBConnect.executorQuery(query, BaseEdgeDocument.class);
|
||||
if (docs != null){
|
||||
List<BaseEdgeDocument> baseDocuments = docs.asListRemaining();
|
||||
int i = 0;
|
||||
for (BaseEdgeDocument doc : baseDocuments) {
|
||||
String key = doc.getKey();
|
||||
BaseArangoData.e_Fqdn_Address_Ip_Map.put(key, doc);
|
||||
i++;
|
||||
}
|
||||
LOG.info(name + ":共处理数据" + i);
|
||||
long l = System.currentTimeMillis();
|
||||
LOG.info(name + "运行时间:" + (l - s));
|
||||
}else {
|
||||
LOG.warn("查询R_LOCATE_FQDN2IP异常,结果为空");
|
||||
}
|
||||
}catch (Exception e){
|
||||
LOG.error(e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,52 @@
|
||||
package cn.ac.iie.etl;
|
||||
|
||||
import cn.ac.iie.dao.BaseArangoData;
|
||||
import cn.ac.iie.utils.ArangoDBConnect;
|
||||
import com.arangodb.ArangoCursor;
|
||||
import com.arangodb.entity.BaseEdgeDocument;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class ArangoEIpVisitFqdnToMap implements Runnable {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ArangoEIpVisitFqdnToMap.class);
|
||||
private ArangoDBConnect arangoDBConnect;
|
||||
private long finalMinTime;
|
||||
private long diffTime;
|
||||
private int threadNumber;
|
||||
|
||||
private ArangoEIpVisitFqdnToMap(){}
|
||||
|
||||
public ArangoEIpVisitFqdnToMap(ArangoDBConnect arangoDBConnect, long finalMinTime, long diffTime, int threadNumber) {
|
||||
this.arangoDBConnect = arangoDBConnect;
|
||||
this.finalMinTime = finalMinTime;
|
||||
this.diffTime = diffTime;
|
||||
this.threadNumber = threadNumber;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
String name = Thread.currentThread().getName();
|
||||
long maxThreadTime = finalMinTime + (threadNumber + 1) * diffTime;
|
||||
long minThreadTime = finalMinTime + threadNumber * diffTime;
|
||||
String query = "FOR doc IN R_VISIT_IP2FQDN filter doc.FIRST_FOUND_TIME >= " + minThreadTime + " and doc.FIRST_FOUND_TIME <= " + maxThreadTime + " RETURN doc";
|
||||
LOG.info(name + ":" + query);
|
||||
long s = System.currentTimeMillis();
|
||||
ArangoCursor<BaseEdgeDocument> docs = arangoDBConnect.executorQuery(query, BaseEdgeDocument.class);
|
||||
|
||||
if (docs != null){
|
||||
List<BaseEdgeDocument> baseDocuments = docs.asListRemaining();
|
||||
int i = 0;
|
||||
for (BaseEdgeDocument doc : baseDocuments) {
|
||||
String key = doc.getKey();
|
||||
BaseArangoData.e_Ip_Visit_Fqdn_Map.put(key, doc);
|
||||
i++;
|
||||
}
|
||||
LOG.info(name + ":共处理数据" + i);
|
||||
long l = System.currentTimeMillis();
|
||||
LOG.info(name + "运行时间:" + (l - s));
|
||||
}else {
|
||||
LOG.warn("查询R_VISIT_IP2FQDN异常,结果为空");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,57 @@
|
||||
package cn.ac.iie.etl;
|
||||
|
||||
import cn.ac.iie.dao.BaseArangoData;
|
||||
import cn.ac.iie.utils.ArangoDBConnect;
|
||||
import com.arangodb.ArangoCursor;
|
||||
import com.arangodb.entity.BaseDocument;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class ArangoVFqdnToMap implements Runnable {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ArangoVFqdnToMap.class);
|
||||
|
||||
private ArangoDBConnect arangoDBConnect;
|
||||
private long finalMinTime;
|
||||
private long diffTime;
|
||||
private int threadNumber;
|
||||
|
||||
private ArangoVFqdnToMap(){}
|
||||
|
||||
public ArangoVFqdnToMap(ArangoDBConnect arangoDBConnect, long finalMinTime, long diffTime, int threadNumber) {
|
||||
this.arangoDBConnect = arangoDBConnect;
|
||||
this.finalMinTime = finalMinTime;
|
||||
this.diffTime = diffTime;
|
||||
this.threadNumber = threadNumber;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
String name = Thread.currentThread().getName();
|
||||
long maxThreadTime = finalMinTime + (threadNumber + 1)* diffTime;
|
||||
long minThreadTime = finalMinTime + threadNumber * diffTime;
|
||||
String query = "FOR doc IN FQDN filter doc.FIRST_FOUND_TIME >= "+minThreadTime+" and doc.FIRST_FOUND_TIME <= "+maxThreadTime+" RETURN doc";
|
||||
LOG.info(name+":"+query);
|
||||
long s = System.currentTimeMillis();
|
||||
ArangoCursor<BaseDocument> docs = arangoDBConnect.executorQuery(query, BaseDocument.class);
|
||||
|
||||
try {
|
||||
if (docs != null){
|
||||
List<BaseDocument> baseDocuments = docs.asListRemaining();
|
||||
int i = 0;
|
||||
for (BaseDocument doc:baseDocuments){
|
||||
String key = doc.getKey();
|
||||
BaseArangoData.v_Fqdn_Map.put(key,doc);
|
||||
i++;
|
||||
}
|
||||
LOG.info(name+":共处理数据"+ i);
|
||||
long l = System.currentTimeMillis();
|
||||
LOG.info(name+"运行时间:"+(l-s));
|
||||
}else {
|
||||
LOG.warn("获取VFqdn异常,结果为空");
|
||||
}
|
||||
}catch (Exception e){
|
||||
LOG.error(e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,57 @@
|
||||
package cn.ac.iie.etl;
|
||||
|
||||
import cn.ac.iie.dao.BaseArangoData;
|
||||
import cn.ac.iie.utils.ArangoDBConnect;
|
||||
import com.arangodb.ArangoCursor;
|
||||
import com.arangodb.entity.BaseDocument;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class ArangoVIpToMap implements Runnable {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ArangoVIpToMap.class);
|
||||
|
||||
private ArangoDBConnect arangoDBConnect;
|
||||
private long finalMinTime;
|
||||
private long diffTime;
|
||||
private int threadNumber;
|
||||
|
||||
private ArangoVIpToMap() {}
|
||||
|
||||
public ArangoVIpToMap(ArangoDBConnect arangoDBConnect, long finalMinTime, long diffTime, int threadNumber) {
|
||||
this.arangoDBConnect = arangoDBConnect;
|
||||
this.finalMinTime = finalMinTime;
|
||||
this.diffTime = diffTime;
|
||||
this.threadNumber = threadNumber;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
String name = Thread.currentThread().getName();
|
||||
long maxThreadTime = finalMinTime + (threadNumber + 1) * diffTime;
|
||||
long minThreadTime = finalMinTime + threadNumber * diffTime;
|
||||
String query = "FOR doc IN IP filter doc.FIRST_FOUND_TIME >= " + minThreadTime + " and doc.FIRST_FOUND_TIME <= " + maxThreadTime + " RETURN doc";
|
||||
LOG.info(name + ":" + query);
|
||||
long s = System.currentTimeMillis();
|
||||
try {
|
||||
ArangoCursor<BaseDocument> docs = arangoDBConnect.executorQuery(query, BaseDocument.class);
|
||||
if (docs != null){
|
||||
List<BaseDocument> baseDocuments = docs.asListRemaining();
|
||||
int i = 0;
|
||||
for (BaseDocument doc : baseDocuments) {
|
||||
String key = doc.getKey();
|
||||
BaseArangoData.v_Ip_Map.put(key, doc);
|
||||
i++;
|
||||
}
|
||||
LOG.info(name + ":共处理数据" + i);
|
||||
long l = System.currentTimeMillis();
|
||||
LOG.info(name + "运行时间:" + (l - s));
|
||||
}else {
|
||||
LOG.warn("获取VIP异常,结果为空");
|
||||
}
|
||||
}catch (Exception e){
|
||||
LOG.error(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,113 @@
|
||||
package cn.ac.iie.etl;
|
||||
|
||||
import cn.ac.iie.config.ApplicationConfig;
|
||||
import cn.ac.iie.dao.BaseArangoData;
|
||||
import cn.ac.iie.utils.ArangoDBConnect;
|
||||
import com.arangodb.entity.BaseEdgeDocument;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
public class UpdateEFqdnAddressIp implements Runnable {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(UpdateEFqdnAddressIp.class);
|
||||
private HashMap<String, BaseEdgeDocument> documentHashMap;
|
||||
|
||||
private static final ArangoDBConnect arangoManger = ArangoDBConnect.getInstance();
|
||||
|
||||
public UpdateEFqdnAddressIp(HashMap<String, BaseEdgeDocument> documentHashMap) {
|
||||
this.documentHashMap = documentHashMap;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
Set<String> keySet = documentHashMap.keySet();
|
||||
ArrayList<BaseEdgeDocument> docInsert = new ArrayList<>();
|
||||
ArrayList<BaseEdgeDocument> docUpdate = new ArrayList<>();
|
||||
int i = 0;
|
||||
try {
|
||||
for (String key : keySet) {
|
||||
BaseEdgeDocument newEdgeDocument = documentHashMap.getOrDefault(key, null);
|
||||
if (newEdgeDocument != null) {
|
||||
i += 1;
|
||||
BaseEdgeDocument edgeDocument = BaseArangoData.e_Fqdn_Address_Ip_Map.getOrDefault(key, null);
|
||||
|
||||
Object lastFoundTime = newEdgeDocument.getAttribute("LAST_FOUND_TIME");
|
||||
long tlsCountTotal = Long.parseLong(newEdgeDocument.getAttribute("TLS_CNT_TOTAL").toString());
|
||||
long httpCountTotal = Long.parseLong(newEdgeDocument.getAttribute("HTTP_CNT_TOTAL").toString());
|
||||
|
||||
if (edgeDocument != null) {
|
||||
long tlsUpdateCountTotal = Long.parseLong(edgeDocument.getAttribute("TLS_CNT_TOTAL").toString());
|
||||
long httpUpdateCountTotal = Long.parseLong(edgeDocument.getAttribute("HTTP_CNT_TOTAL").toString());
|
||||
|
||||
edgeDocument.addAttribute("LAST_FOUND_TIME", lastFoundTime);
|
||||
edgeDocument.addAttribute("TLS_CNT_TOTAL", tlsCountTotal + tlsUpdateCountTotal);
|
||||
edgeDocument.addAttribute("HTTP_CNT_TOTAL", httpCountTotal + httpUpdateCountTotal);
|
||||
|
||||
ArrayList<Long> tlsCntRecent = (ArrayList<Long>) edgeDocument.getAttribute("TLS_CNT_RECENT");
|
||||
Long[] tlsCntRecentsSrc = tlsCntRecent.toArray(new Long[tlsCntRecent.size()]);
|
||||
// Long[] tlsCntRecentsSrc = (Long[]) edgeDocument.getAttribute("TLS_CNT_RECENT");
|
||||
Long[] tlsCntRecentsDst = new Long[7];
|
||||
System.arraycopy(tlsCntRecentsSrc, 0, tlsCntRecentsDst, 1, tlsCntRecentsSrc.length - 1);
|
||||
tlsCntRecentsDst[0] = tlsCountTotal;
|
||||
edgeDocument.addAttribute("TLS_CNT_RECENT", tlsCntRecentsDst);
|
||||
|
||||
ArrayList<Long> httpCntRecent = (ArrayList<Long>) edgeDocument.getAttribute("HTTP_CNT_RECENT");
|
||||
Long[] httpCntRecentsSrc = httpCntRecent.toArray(new Long[httpCntRecent.size()]);
|
||||
// Long[] httpCntRecentsSrc = (Long[]) edgeDocument.getAttribute("HTTP_CNT_RECENT");
|
||||
Long[] httpCntRecentsDst = new Long[7];
|
||||
System.arraycopy(httpCntRecentsSrc, 0, httpCntRecentsDst, 1, httpCntRecentsDst.length - 1);
|
||||
httpCntRecentsDst[0] = httpCountTotal;
|
||||
edgeDocument.addAttribute("HTTP_CNT_RECENT", httpCntRecentsDst);
|
||||
|
||||
ArrayList<String> distCipTotal = (ArrayList<String>) edgeDocument.getAttribute("DIST_CIP_TOTAL");
|
||||
String[] distCipTotalsSrc = distCipTotal.toArray(new String[distCipTotal.size()]);
|
||||
// String[] distCipTotalsSrc = (String[]) edgeDocument.getAttribute("DIST_CIP_TOTAL");
|
||||
|
||||
// ArrayList<String> distCipRecent = (ArrayList<String>) newEdgeDocument.getAttribute("DIST_CIP_RECENT");
|
||||
// String[] distCipRecentsSrc = distCipRecent.toArray(new String[distCipRecent.size()]);
|
||||
String[] distCipRecentsSrc = (String[]) newEdgeDocument.getAttribute("DIST_CIP_RECENT");
|
||||
|
||||
if (distCipTotalsSrc.length == 30) {
|
||||
HashSet<String> dIpSet = new HashSet<>();
|
||||
dIpSet.addAll(Arrays.asList(distCipRecentsSrc));
|
||||
dIpSet.addAll(Arrays.asList(distCipTotalsSrc));
|
||||
Object[] distCipTotals = dIpSet.toArray();
|
||||
if (distCipTotals.length > 30) {
|
||||
System.arraycopy(distCipTotals, 0, distCipTotals, 0, 30);
|
||||
}
|
||||
edgeDocument.addAttribute("DIST_CIP_TOTAL", distCipTotals);
|
||||
}
|
||||
edgeDocument.addAttribute("DIST_CIP_RECENT", distCipRecentsSrc);
|
||||
|
||||
// docUpdate.add(edgeDocument);
|
||||
docInsert.add(edgeDocument);
|
||||
} else {
|
||||
long[] tlsCntRecentsDst = new long[7];
|
||||
tlsCntRecentsDst[0] = tlsCountTotal;
|
||||
newEdgeDocument.addAttribute("TLS_CNT_RECENT", tlsCntRecentsDst);
|
||||
|
||||
long[] httpCntRecentsDst = new long[7];
|
||||
httpCntRecentsDst[0] = httpCountTotal;
|
||||
newEdgeDocument.addAttribute("HTTP_CNT_RECENT", httpCntRecentsDst);
|
||||
|
||||
docInsert.add(newEdgeDocument);
|
||||
}
|
||||
if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) {
|
||||
// arangoManger.insertAndUpdate(docInsert, docUpdate, "R_LOCATE_FQDN2IP");
|
||||
arangoManger.overwrite(docInsert,"R_LOCATE_FQDN2IP");
|
||||
LOG.info("更新R_LOCATE_FQDN2IP:" + i);
|
||||
i = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (i != 0) {
|
||||
// arangoManger.insertAndUpdate(docInsert, docUpdate, "R_LOCATE_FQDN2IP");
|
||||
arangoManger.overwrite(docInsert,"R_LOCATE_FQDN2IP");
|
||||
LOG.info("更新R_LOCATE_FQDN2IP:" + i);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.error(e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,98 @@
|
||||
package cn.ac.iie.etl;
|
||||
|
||||
import cn.ac.iie.config.ApplicationConfig;
|
||||
import cn.ac.iie.dao.BaseArangoData;
|
||||
import cn.ac.iie.utils.ArangoDBConnect;
|
||||
import com.arangodb.entity.BaseEdgeDocument;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Set;
|
||||
|
||||
public class UpdateEIpVisitFqdn implements Runnable {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(UpdateEIpVisitFqdn.class);
|
||||
private HashMap<String, BaseEdgeDocument> documentHashMap;
|
||||
|
||||
private static final ArangoDBConnect arangoManger = ArangoDBConnect.getInstance();
|
||||
|
||||
public UpdateEIpVisitFqdn(HashMap<String, BaseEdgeDocument> documentHashMap) {
|
||||
this.documentHashMap = documentHashMap;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
Set<String> keySet = documentHashMap.keySet();
|
||||
ArrayList<BaseEdgeDocument> docInsert = new ArrayList<>();
|
||||
ArrayList<BaseEdgeDocument> docUpdate = new ArrayList<>();
|
||||
int i = 0;
|
||||
try {
|
||||
for (String key : keySet) {
|
||||
|
||||
|
||||
BaseEdgeDocument newEdgeDocument = documentHashMap.getOrDefault(key, null);
|
||||
if (newEdgeDocument != null) {
|
||||
i += 1;
|
||||
BaseEdgeDocument edgeDocument = BaseArangoData.e_Ip_Visit_Fqdn_Map.getOrDefault(key, null);
|
||||
|
||||
Object lastFoundTime = newEdgeDocument.getAttribute("LAST_FOUND_TIME");
|
||||
long tlsCountTotal = Long.parseLong(newEdgeDocument.getAttribute("TLS_CNT_TOTAL").toString());
|
||||
long httpCountTotal = Long.parseLong(newEdgeDocument.getAttribute("HTTP_CNT_TOTAL").toString());
|
||||
|
||||
if (edgeDocument != null) {
|
||||
long tlsUpdateCountTotal = Long.parseLong(edgeDocument.getAttribute("TLS_CNT_TOTAL").toString());
|
||||
long httpUpdateCountTotal = Long.parseLong(edgeDocument.getAttribute("HTTP_CNT_TOTAL").toString());
|
||||
|
||||
edgeDocument.addAttribute("LAST_FOUND_TIME", lastFoundTime);
|
||||
edgeDocument.addAttribute("TLS_CNT_TOTAL", tlsCountTotal + tlsUpdateCountTotal);
|
||||
edgeDocument.addAttribute("HTTP_CNT_TOTAL", httpCountTotal + httpUpdateCountTotal);
|
||||
|
||||
ArrayList<Long> tlsCntRecent = (ArrayList<Long>) edgeDocument.getAttribute("TLS_CNT_RECENT");
|
||||
Long[] tlsCntRecentsSrc = tlsCntRecent.toArray(new Long[tlsCntRecent.size()]);
|
||||
// Long[] tlsCntRecentsSrc = (Long[]) edgeDocument.getAttribute("TLS_CNT_RECENT");
|
||||
Long[] tlsCntRecentsDst = new Long[7];
|
||||
System.arraycopy(tlsCntRecentsSrc, 0, tlsCntRecentsDst, 1, tlsCntRecentsSrc.length - 1);
|
||||
tlsCntRecentsDst[0] = tlsCountTotal;
|
||||
edgeDocument.addAttribute("TLS_CNT_RECENT", tlsCntRecentsDst);
|
||||
|
||||
ArrayList<Long> httpCntRecent = (ArrayList<Long>) edgeDocument.getAttribute("HTTP_CNT_RECENT");
|
||||
Long[] httpCntRecentsSrc = httpCntRecent.toArray(new Long[httpCntRecent.size()]);
|
||||
// Long[] httpCntRecentsSrc = (Long[]) edgeDocument.getAttribute("HTTP_CNT_RECENT");
|
||||
Long[] httpCntRecentsDst = new Long[7];
|
||||
System.arraycopy(httpCntRecentsSrc, 0, httpCntRecentsDst, 1, httpCntRecentsDst.length - 1);
|
||||
httpCntRecentsDst[0] = httpCountTotal;
|
||||
edgeDocument.addAttribute("HTTP_CNT_RECENT", httpCntRecentsDst);
|
||||
|
||||
// docUpdate.add(edgeDocument);
|
||||
docInsert.add(edgeDocument);
|
||||
} else {
|
||||
long[] tlsCntRecentsDst = new long[7];
|
||||
tlsCntRecentsDst[0] = tlsCountTotal;
|
||||
newEdgeDocument.addAttribute("TLS_CNT_RECENT", tlsCntRecentsDst);
|
||||
|
||||
long[] httpCntRecentsDst = new long[7];
|
||||
httpCntRecentsDst[0] = httpCountTotal;
|
||||
newEdgeDocument.addAttribute("HTTP_CNT_RECENT", httpCntRecentsDst);
|
||||
|
||||
docInsert.add(newEdgeDocument);
|
||||
}
|
||||
if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) {
|
||||
// arangoManger.insertAndUpdate(docInsert, docUpdate, "R_VISIT_IP2FQDN");
|
||||
arangoManger.overwrite(docInsert,"R_VISIT_IP2FQDN");
|
||||
LOG.info("更新R_VISIT_IP2FQDN:" + i);
|
||||
i = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (i != 0) {
|
||||
// arangoManger.insertAndUpdate(docInsert, docUpdate, "R_VISIT_IP2FQDN");
|
||||
arangoManger.overwrite(docInsert,"R_VISIT_IP2FQDN");
|
||||
LOG.info("更新R_VISIT_IP2FQDN:" + i);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.error(e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,62 @@
|
||||
package cn.ac.iie.etl;
|
||||
|
||||
import cn.ac.iie.config.ApplicationConfig;
|
||||
import cn.ac.iie.dao.BaseArangoData;
|
||||
import cn.ac.iie.utils.ArangoDBConnect;
|
||||
import com.arangodb.entity.BaseDocument;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Set;
|
||||
|
||||
public class UpdateVFqdn implements Runnable{
|
||||
private static final Logger LOG = LoggerFactory.getLogger(UpdateVFqdn.class);
|
||||
|
||||
private ArrayList<BaseDocument> documentList;
|
||||
|
||||
private static final ArangoDBConnect arangoManger = ArangoDBConnect.getInstance();
|
||||
|
||||
public UpdateVFqdn(ArrayList<BaseDocument> documentList) {
|
||||
this.documentList = documentList;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
ArrayList<BaseDocument> docInsert = new ArrayList<>();
|
||||
ArrayList<BaseDocument> docUpdate = new ArrayList<>();
|
||||
int i = 0;
|
||||
try {
|
||||
for (BaseDocument newDocument:documentList){
|
||||
String key = newDocument.getKey();
|
||||
if (!key.equals("")){
|
||||
i += 1;
|
||||
BaseDocument document = BaseArangoData.v_Fqdn_Map.getOrDefault(key, null);
|
||||
if (document != null){
|
||||
Object lastFoundTime = newDocument.getAttribute("LAST_FOUND_TIME");
|
||||
document.addAttribute("LAST_FOUND_TIME",lastFoundTime);
|
||||
// docUpdate.add(document);
|
||||
docInsert.add(document);
|
||||
}else {
|
||||
docInsert.add(newDocument);
|
||||
}
|
||||
if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){
|
||||
// arangoManger.insertAndUpdate(docInsert,docUpdate,"FQDN");
|
||||
arangoManger.overwrite(docInsert,"FQDN");
|
||||
LOG.info("更新FQDN:"+i);
|
||||
i = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (i != 0){
|
||||
// arangoManger.insertAndUpdate(docInsert,docUpdate,"FQDN");
|
||||
arangoManger.overwrite(docInsert,"FQDN");
|
||||
LOG.info("更新FQDN:"+i);
|
||||
}
|
||||
}catch (Exception e){
|
||||
LOG.error(e.getMessage());
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
60
IP-learning-graph/src/main/java/cn/ac/iie/etl/UpdateVIP.java
Normal file
60
IP-learning-graph/src/main/java/cn/ac/iie/etl/UpdateVIP.java
Normal file
@@ -0,0 +1,60 @@
|
||||
package cn.ac.iie.etl;
|
||||
|
||||
|
||||
import cn.ac.iie.config.ApplicationConfig;
|
||||
import cn.ac.iie.dao.BaseArangoData;
|
||||
import cn.ac.iie.utils.ArangoDBConnect;
|
||||
import com.arangodb.entity.BaseDocument;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
||||
public class UpdateVIP implements Runnable {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(UpdateVIP.class);
|
||||
|
||||
private ArrayList<BaseDocument> documentList;
|
||||
|
||||
private static final ArangoDBConnect arangoManger = ArangoDBConnect.getInstance();
|
||||
|
||||
public UpdateVIP(ArrayList<BaseDocument> documentList) {
|
||||
this.documentList = documentList;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
ArrayList<BaseDocument> docInsert = new ArrayList<>();
|
||||
ArrayList<BaseDocument> docUpdate = new ArrayList<>();
|
||||
int i = 0;
|
||||
try {
|
||||
for (BaseDocument newDocument:documentList){
|
||||
String key = newDocument.getKey();
|
||||
if (!key.equals("")){
|
||||
i += 1;
|
||||
BaseDocument document = BaseArangoData.v_Ip_Map.getOrDefault(key, null);
|
||||
if (document != null){
|
||||
Object lastFoundTime = newDocument.getAttribute("LAST_FOUND_TIME");
|
||||
document.addAttribute("LAST_FOUND_TIME",lastFoundTime);
|
||||
// docUpdate.add(document);
|
||||
docInsert.add(document);
|
||||
}else {
|
||||
docInsert.add(newDocument);
|
||||
}
|
||||
if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){
|
||||
// arangoManger.insertAndUpdate(docInsert,docUpdate,"IP");
|
||||
arangoManger.overwrite(docInsert,"IP");
|
||||
LOG.info("更新IP:"+i);
|
||||
i = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (i != 0){
|
||||
// arangoManger.insertAndUpdate(docInsert,docUpdate,"IP");
|
||||
arangoManger.overwrite(docInsert,"IP");
|
||||
LOG.info("更新IP:"+i);
|
||||
}
|
||||
}catch (Exception e){
|
||||
LOG.error(e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,76 @@
|
||||
package cn.ac.iie.test;
|
||||
|
||||
import cn.ac.iie.dao.BaseArangoData;
|
||||
import cn.ac.iie.dao.BaseClickhouseData;
|
||||
import cn.ac.iie.utils.ArangoDBConnect;
|
||||
import cn.ac.iie.utils.ExecutorThreadPool;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
public class IpLearningApplicationTest {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(IpLearningApplicationTest.class);
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
long startA = System.currentTimeMillis();
|
||||
BaseArangoData.BaseVFqdnDataMap();
|
||||
BaseArangoData.BaseVIpDataMap();
|
||||
BaseArangoData.BaseEFqdnAddressIpDataMap();
|
||||
BaseArangoData.BaseEIpVisitFqdnDataMap();
|
||||
|
||||
|
||||
ExecutorThreadPool.shutdown();
|
||||
ExecutorThreadPool.awaitThreadTask();
|
||||
long lastA = System.currentTimeMillis();
|
||||
LOG.info("读取ArangoDb时间:"+(lastA - startA));
|
||||
|
||||
long startC = System.currentTimeMillis();
|
||||
|
||||
try {
|
||||
CountDownLatch countDownLatch = new CountDownLatch(4);
|
||||
|
||||
new Thread(() -> {
|
||||
BaseClickhouseData.BaseVFqdn();
|
||||
countDownLatch.countDown();
|
||||
}).start();
|
||||
|
||||
new Thread(() -> {
|
||||
BaseClickhouseData.BaseVIp();
|
||||
countDownLatch.countDown();
|
||||
}).start();
|
||||
|
||||
new Thread(() -> {
|
||||
BaseClickhouseData.BaseEFqdnAddressIp();
|
||||
countDownLatch.countDown();
|
||||
}).start();
|
||||
|
||||
new Thread(() -> {
|
||||
BaseClickhouseData.BaseEIpVisitFqdn();
|
||||
countDownLatch.countDown();
|
||||
}).start();
|
||||
|
||||
try {
|
||||
countDownLatch.await();
|
||||
LOG.info("主线程等待完毕");
|
||||
}catch (Exception e){
|
||||
LOG.error("主线程阻塞异常:\n"+e.toString());
|
||||
}
|
||||
|
||||
// BaseClickhouseData.BaseEFqdnAddressIp();
|
||||
long lastC = System.currentTimeMillis();
|
||||
LOG.info("更新ArangoDb时间:"+(lastC - startC));
|
||||
}catch (Exception e){
|
||||
e.printStackTrace();
|
||||
}finally {
|
||||
ArangoDBConnect.clean();
|
||||
}
|
||||
|
||||
LOG.info("v_Fqdn_Map大小:"+BaseArangoData.v_Fqdn_Map.size());
|
||||
LOG.info("v_Ip_Map大小:"+BaseArangoData.v_Ip_Map.size());
|
||||
LOG.info("e_Fqdn_Address_Ip_Map大小:"+BaseArangoData.e_Fqdn_Address_Ip_Map.size());
|
||||
LOG.info("e_Ip_Visit_Fqdn_Map大小:"+BaseArangoData.e_Ip_Visit_Fqdn_Map.size());
|
||||
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,95 @@
|
||||
package cn.ac.iie.test;
|
||||
|
||||
import cn.ac.iie.config.ApplicationConfig;
|
||||
import com.arangodb.ArangoCursor;
|
||||
import com.arangodb.ArangoDB;
|
||||
import com.arangodb.ArangoDatabase;
|
||||
import com.arangodb.entity.BaseDocument;
|
||||
import com.arangodb.model.AqlQueryOptions;
|
||||
import com.arangodb.util.MapBuilder;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class ReadArangoDBThreadTest {
|
||||
|
||||
private static ConcurrentHashMap<String,BaseDocument> fqdnMap = new ConcurrentHashMap<String,BaseDocument>();
|
||||
public static void main(String[] args) throws Exception {
|
||||
final ArangoDB arangoDB = new ArangoDB.Builder()
|
||||
.maxConnections(ApplicationConfig.THREAD_POOL_NUMBER)
|
||||
.host(ApplicationConfig.ARANGODB_HOST, ApplicationConfig.ARANGODB_PORT)
|
||||
.user(ApplicationConfig.ARANGODB_USER)
|
||||
.password(ApplicationConfig.ARANGODB_PASSWORD)
|
||||
.build();
|
||||
Map<String, Object> bindVars = new MapBuilder().get();
|
||||
AqlQueryOptions options = new AqlQueryOptions().ttl(ApplicationConfig.ARANGODB_TTL);
|
||||
|
||||
String sql = "LET FQDN = (FOR doc IN V_FQDN RETURN doc) return {max_time:MAX(FQDN[*].FQDN_FIRST_FOUND_TIME),min_time:MIN(FQDN[*].FQDN_FIRST_FOUND_TIME)}";
|
||||
// String sql = "LET IP = (FOR doc IN V_IP RETURN doc) return {max_time:MAX(IP[*].FIRST_FOUND_TIME),min_time:MIN(IP[*].FIRST_FOUND_TIME)}";
|
||||
final ArangoDatabase db = arangoDB.db("insert_iplearn_index");
|
||||
long startTime = System.currentTimeMillis();
|
||||
ArangoCursor<BaseDocument> timeDoc = db.query(sql, bindVars, options, BaseDocument.class);
|
||||
long maxTime =0L;
|
||||
long minTime =0L;
|
||||
while (timeDoc.hasNext()){
|
||||
BaseDocument doc = timeDoc.next();
|
||||
maxTime = Long.parseLong(doc.getAttribute("max_time").toString()) + ApplicationConfig.THREAD_POOL_NUMBER;
|
||||
minTime = Long.parseLong(doc.getAttribute("min_time").toString());
|
||||
}
|
||||
long lastTime = System.currentTimeMillis();
|
||||
System.out.println("查询最大最小时间用时:"+(lastTime-startTime));
|
||||
System.out.println(maxTime + "--" + minTime);
|
||||
final long diffTime = (maxTime - minTime) / ApplicationConfig.THREAD_POOL_NUMBER;
|
||||
ExecutorService pool = Executors.newFixedThreadPool(ApplicationConfig.THREAD_POOL_NUMBER);
|
||||
|
||||
for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) {
|
||||
// long finalMaxTime = maxTime;
|
||||
final long finalMinTime = minTime;
|
||||
pool.execute(new Runnable() {
|
||||
|
||||
public void run() {
|
||||
String name = Thread.currentThread().getName();
|
||||
ArangoDatabase insert_iplearn_index = arangoDB.db("insert_iplearn_index");
|
||||
Map<String, Object> bindVars = new MapBuilder().get();
|
||||
AqlQueryOptions options = new AqlQueryOptions().ttl(ApplicationConfig.ARANGODB_TTL);
|
||||
String[] split = name.split("-");
|
||||
Long threadNum = Long.parseLong(split[3]);
|
||||
long maxThreadTime = finalMinTime + threadNum * diffTime;
|
||||
long minThreadTime = finalMinTime + (threadNum-1)*diffTime;
|
||||
String query = "FOR doc IN V_FQDN filter doc.FQDN_FIRST_FOUND_TIME >= "+minThreadTime+" and doc.FQDN_FIRST_FOUND_TIME <= "+maxThreadTime+" RETURN doc";
|
||||
// String query = "FOR doc IN V_IP filter doc.FIRST_FOUND_TIME >= "+minThreadTime+" and doc.FIRST_FOUND_TIME <= "+maxThreadTime+" RETURN doc";
|
||||
System.out.println(name+":"+query);
|
||||
long s = System.currentTimeMillis();
|
||||
ArangoCursor<BaseDocument> fqdnDoc = insert_iplearn_index.query(query, bindVars, options, BaseDocument.class);
|
||||
List<BaseDocument> baseDocuments = fqdnDoc.asListRemaining();
|
||||
int i = 0;
|
||||
for (BaseDocument doc:baseDocuments){
|
||||
String key = doc.getKey();
|
||||
// System.out.println(name+":"+key);
|
||||
fqdnMap.put(key,doc);
|
||||
i++;
|
||||
}
|
||||
/*
|
||||
while (fqdnDoc.hasNext()){
|
||||
BaseDocument doc = fqdnDoc.next();
|
||||
}
|
||||
*/
|
||||
System.out.println(name+":"+ i);
|
||||
long l = System.currentTimeMillis();
|
||||
System.out.println(name+"运行时间:"+(l-s));
|
||||
}
|
||||
});
|
||||
}
|
||||
pool.shutdown();
|
||||
while (!pool.awaitTermination(20, TimeUnit.SECONDS)){
|
||||
|
||||
}
|
||||
System.out.println(fqdnMap.size());
|
||||
arangoDB.shutdown();
|
||||
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,116 @@
|
||||
package cn.ac.iie.utils;
|
||||
|
||||
import cn.ac.iie.config.ApplicationConfig;
|
||||
import com.arangodb.ArangoCollection;
|
||||
import com.arangodb.ArangoCursor;
|
||||
import com.arangodb.ArangoDB;
|
||||
import com.arangodb.ArangoDatabase;
|
||||
import com.arangodb.entity.DocumentCreateEntity;
|
||||
import com.arangodb.entity.ErrorEntity;
|
||||
import com.arangodb.entity.MultiDocumentEntity;
|
||||
import com.arangodb.model.AqlQueryOptions;
|
||||
import com.arangodb.model.DocumentCreateOptions;
|
||||
import com.arangodb.util.MapBuilder;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
|
||||
public class ArangoDBConnect {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ArangoDBConnect.class);
|
||||
private static ArangoDB arangoDB = null;
|
||||
private static ArangoDBConnect conn = null;
|
||||
static {
|
||||
getArangoDB();
|
||||
}
|
||||
|
||||
private static void getArangoDB(){
|
||||
arangoDB = new ArangoDB.Builder()
|
||||
.maxConnections(ApplicationConfig.THREAD_POOL_NUMBER)
|
||||
.host(ApplicationConfig.ARANGODB_HOST, ApplicationConfig.ARANGODB_PORT)
|
||||
.user(ApplicationConfig.ARANGODB_USER)
|
||||
.password(ApplicationConfig.ARANGODB_PASSWORD)
|
||||
.build();
|
||||
}
|
||||
|
||||
public static synchronized ArangoDBConnect getInstance(){
|
||||
if (null == conn){
|
||||
conn = new ArangoDBConnect();
|
||||
}
|
||||
return conn;
|
||||
}
|
||||
|
||||
public ArangoDatabase getDatabase(){
|
||||
return arangoDB.db(ApplicationConfig.ARANGODB_DB_NAME);
|
||||
}
|
||||
|
||||
public static void clean(){
|
||||
try {
|
||||
if (arangoDB != null){
|
||||
arangoDB.shutdown();
|
||||
}
|
||||
}catch (Exception e){
|
||||
LOG.error(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public <T> ArangoCursor<T> executorQuery(String query,Class<T> type){
|
||||
ArangoDatabase database = getDatabase();
|
||||
Map<String, Object> bindVars = new MapBuilder().get();
|
||||
AqlQueryOptions options = new AqlQueryOptions().ttl(ApplicationConfig.ARANGODB_TTL);
|
||||
try {
|
||||
return database.query(query, bindVars, options, type);
|
||||
}catch (Exception e){
|
||||
LOG.error(e.getMessage());
|
||||
return null;
|
||||
}finally {
|
||||
bindVars.clear();
|
||||
}
|
||||
}
|
||||
|
||||
public <T> void insertAndUpdate(ArrayList<T> docInsert,ArrayList<T> docUpdate,String collectionName){
|
||||
ArangoDatabase database = getDatabase();
|
||||
try {
|
||||
ArangoCollection collection = database.collection(collectionName);
|
||||
if (!docInsert.isEmpty()){
|
||||
DocumentCreateOptions documentCreateOptions = new DocumentCreateOptions();
|
||||
// documentCreateOptions.overwrite(true);
|
||||
collection.insertDocuments(docInsert, documentCreateOptions);
|
||||
}
|
||||
if (!docUpdate.isEmpty()){
|
||||
collection.replaceDocuments(docUpdate);
|
||||
}
|
||||
}catch (Exception e){
|
||||
LOG.error("更新失败\n"+e.toString());
|
||||
}finally {
|
||||
docInsert.clear();
|
||||
docInsert.clear();
|
||||
}
|
||||
}
|
||||
|
||||
public <T> void overwrite(ArrayList<T> docOverwrite,String collectionName){
|
||||
ArangoDatabase database = getDatabase();
|
||||
try {
|
||||
ArangoCollection collection = database.collection(collectionName);
|
||||
if (!docOverwrite.isEmpty()){
|
||||
DocumentCreateOptions documentCreateOptions = new DocumentCreateOptions();
|
||||
documentCreateOptions.overwrite(true);
|
||||
documentCreateOptions.returnNew(true);
|
||||
documentCreateOptions.returnOld(true);
|
||||
MultiDocumentEntity<DocumentCreateEntity<T>> documentCreateEntityMultiDocumentEntity = collection.insertDocuments(docOverwrite, documentCreateOptions);
|
||||
Collection<ErrorEntity> errors = documentCreateEntityMultiDocumentEntity.getErrors();
|
||||
for (ErrorEntity errorEntity:errors){
|
||||
LOG.error("写入arangoDB异常:"+errorEntity.getErrorMessage());
|
||||
}
|
||||
|
||||
}
|
||||
}catch (Exception e){
|
||||
LOG.error(e.toString());
|
||||
}finally {
|
||||
docOverwrite.clear();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,109 @@
|
||||
package cn.ac.iie.utils;
|
||||
|
||||
import com.alibaba.druid.pool.DruidDataSource;
|
||||
import com.alibaba.druid.pool.DruidPooledConnection;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.util.Properties;
|
||||
|
||||
public class ClickhouseConnect {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ClickhouseConnect.class);
|
||||
private static DruidDataSource dataSource = null;
|
||||
private static ClickhouseConnect dbConnect = null;
|
||||
private static Properties props = new Properties();
|
||||
|
||||
static {
|
||||
getDbConnect();
|
||||
}
|
||||
|
||||
private static void getDbConnect() {
|
||||
try {
|
||||
if (dataSource == null) {
|
||||
dataSource = new DruidDataSource();
|
||||
props.load(ClickhouseConnect.class.getClassLoader().getResourceAsStream("clickhouse.properties"));
|
||||
//设置连接参数
|
||||
dataSource.setUrl("jdbc:clickhouse://" + props.getProperty("db.id"));
|
||||
dataSource.setDriverClassName(props.getProperty("drivers"));
|
||||
dataSource.setUsername(props.getProperty("mdb.user"));
|
||||
dataSource.setPassword(props.getProperty("mdb.password"));
|
||||
//配置初始化大小、最小、最大
|
||||
dataSource.setInitialSize(Integer.parseInt(props.getProperty("initialsize")));
|
||||
dataSource.setMinIdle(Integer.parseInt(props.getProperty("minidle")));
|
||||
dataSource.setMaxActive(Integer.parseInt(props.getProperty("maxactive")));
|
||||
//配置获取连接等待超时的时间
|
||||
dataSource.setMaxWait(30000);
|
||||
//配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
|
||||
dataSource.setTimeBetweenEvictionRunsMillis(2000);
|
||||
//防止过期
|
||||
dataSource.setValidationQuery("SELECT 1");
|
||||
dataSource.setTestWhileIdle(true);
|
||||
dataSource.setTestOnBorrow(true);
|
||||
dataSource.setKeepAlive(true);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.error(e.getMessage());
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 数据库连接池单例
|
||||
*
|
||||
* @return dbConnect
|
||||
*/
|
||||
public static synchronized ClickhouseConnect getInstance() {
|
||||
if (null == dbConnect) {
|
||||
dbConnect = new ClickhouseConnect();
|
||||
}
|
||||
return dbConnect;
|
||||
}
|
||||
|
||||
/**
|
||||
* 返回druid数据库连接
|
||||
*
|
||||
* @return 连接
|
||||
* @throws SQLException sql异常
|
||||
*/
|
||||
public DruidPooledConnection getConnection() throws SQLException {
|
||||
return dataSource.getConnection();
|
||||
}
|
||||
|
||||
/**
|
||||
* 清空PreparedStatement、Connection对象,未定义的置空。
|
||||
*
|
||||
* @param pstmt PreparedStatement对象
|
||||
* @param connection Connection对象
|
||||
*/
|
||||
public void clear(Statement pstmt, Connection connection) {
|
||||
try {
|
||||
if (pstmt != null) {
|
||||
pstmt.close();
|
||||
}
|
||||
if (connection != null) {
|
||||
connection.close();
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
LOG.error(e.getMessage());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public ResultSet executorQuery(String query,Connection connection,Statement pstm){
|
||||
// Connection connection = null;
|
||||
// Statement pstm = null;
|
||||
try {
|
||||
connection = getConnection();
|
||||
pstm = connection.createStatement();
|
||||
return pstm.executeQuery(query);
|
||||
}catch (Exception e){
|
||||
LOG.error(e.getMessage());
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,40 @@
|
||||
package cn.ac.iie.utils;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
public class ConfigUtils {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ConfigUtils.class);
|
||||
private static Properties propCommon = new Properties();
|
||||
|
||||
public static String getStringProperty(String key) {
|
||||
return propCommon.getProperty(key);
|
||||
}
|
||||
|
||||
|
||||
public static Integer getIntProperty(String key) {
|
||||
return Integer.parseInt(propCommon.getProperty(key));
|
||||
}
|
||||
|
||||
public static Long getLongProperty(String key) {
|
||||
return Long.parseLong(propCommon.getProperty(key));
|
||||
}
|
||||
|
||||
public static Boolean getBooleanProperty(String key) {
|
||||
return "true".equals(propCommon.getProperty(key).toLowerCase().trim());
|
||||
}
|
||||
|
||||
static {
|
||||
try {
|
||||
propCommon.load(ConfigUtils.class.getClassLoader().getResourceAsStream("application.properties"));
|
||||
LOG.info("application.properties加载成功");
|
||||
|
||||
|
||||
} catch (Exception e) {
|
||||
propCommon = null;
|
||||
LOG.error("配置加载失败");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,58 @@
|
||||
package cn.ac.iie.utils;
|
||||
|
||||
import cn.ac.iie.config.ApplicationConfig;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class ExecutorThreadPool {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ExecutorThreadPool.class);
|
||||
private static ExecutorService pool = null ;
|
||||
private static ExecutorThreadPool poolExecutor = null;
|
||||
|
||||
static {
|
||||
getThreadPool();
|
||||
}
|
||||
|
||||
private static void getThreadPool(){
|
||||
pool = Executors.newFixedThreadPool(ApplicationConfig.THREAD_POOL_NUMBER);
|
||||
}
|
||||
|
||||
public static ExecutorThreadPool getInstance(){
|
||||
if (null == poolExecutor){
|
||||
poolExecutor = new ExecutorThreadPool();
|
||||
}
|
||||
return poolExecutor;
|
||||
}
|
||||
|
||||
public void executor(Runnable command){
|
||||
pool.execute(command);
|
||||
}
|
||||
|
||||
public static void awaitThreadTask(){
|
||||
try {
|
||||
while (!pool.awaitTermination(ApplicationConfig.THREAD_AWAIT_TERMINATION_TIME, TimeUnit.SECONDS)) {
|
||||
LOG.warn("线程池没有关闭");
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
public static void shutdown(){
|
||||
pool.shutdown();
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public static Long getThreadNumber(){
|
||||
String name = Thread.currentThread().getName();
|
||||
String[] split = name.split("-");
|
||||
return Long.parseLong(split[3]);
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
17
IP-learning-graph/src/main/resources/application.properties
Normal file
17
IP-learning-graph/src/main/resources/application.properties
Normal file
@@ -0,0 +1,17 @@
|
||||
#arangoDB参数配置
|
||||
arangoDB.host=192.168.40.182
|
||||
arangoDB.port=8529
|
||||
arangoDB.user=root
|
||||
arangoDB.password=111111
|
||||
#arangoDB.DB.name=ip-learning-test
|
||||
arangoDB.DB.name=ip-learning-test
|
||||
arangoDB.batch=100000
|
||||
arangoDB.ttl=3600
|
||||
|
||||
update.arango.batch=10000
|
||||
|
||||
thread.pool.number=10
|
||||
thread.await.termination.time=10
|
||||
|
||||
read.clickhouse.max.time=1593162456
|
||||
read.clickhouse.min.time=1592879247
|
||||
@@ -0,0 +1,9 @@
|
||||
drivers=ru.yandex.clickhouse.ClickHouseDriver
|
||||
#db.id=192.168.40.224:8123/tsg_galaxy_v3?socket_timeout=300000
|
||||
db.id=192.168.40.186:8123/tsg_galaxy_v3?socket_timeout=300000
|
||||
mdb.user=default
|
||||
#mdb.password=ceiec2019
|
||||
mdb.password=111111
|
||||
initialsize=1
|
||||
minidle=1
|
||||
maxactive=50
|
||||
24
IP-learning-graph/src/main/resources/log4j.properties
Normal file
24
IP-learning-graph/src/main/resources/log4j.properties
Normal file
@@ -0,0 +1,24 @@
|
||||
######################### logger ##############################
|
||||
log4j.logger.org.apache.http=OFF
|
||||
log4j.logger.org.apache.http.wire=OFF
|
||||
|
||||
#Log4j
|
||||
log4j.rootLogger=info,console,file
|
||||
# <20><><EFBFBD><EFBFBD>̨<EFBFBD><CCA8>־<EFBFBD><D6BE><EFBFBD><EFBFBD>
|
||||
log4j.appender.console=org.apache.log4j.ConsoleAppender
|
||||
log4j.appender.console.Threshold=info
|
||||
log4j.appender.console.layout=org.apache.log4j.PatternLayout
|
||||
log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n
|
||||
|
||||
# <20>ļ<EFBFBD><C4BC><EFBFBD>־<EFBFBD><D6BE><EFBFBD><EFBFBD>
|
||||
log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
|
||||
log4j.appender.file.Threshold=info
|
||||
log4j.appender.file.encoding=UTF-8
|
||||
log4j.appender.file.Append=true
|
||||
#·<><C2B7><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>·<EFBFBD><C2B7><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ز<EFBFBD><D8B2><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ӧ<EFBFBD><D3A6>Ŀ<EFBFBD><C4BF>
|
||||
#log4j.appender.file.file=/home/ceiec/iplearning/logs/ip-learning-application.log
|
||||
log4j.appender.file.file=./logs/ip-learning-application.log
|
||||
log4j.appender.file.DatePattern='.'yyyy-MM-dd
|
||||
log4j.appender.file.layout=org.apache.log4j.PatternLayout
|
||||
#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n
|
||||
log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n
|
||||
13
IP-learning-graph/src/test/java/cn/ac/iie/ArrayTest.java
Normal file
13
IP-learning-graph/src/test/java/cn/ac/iie/ArrayTest.java
Normal file
@@ -0,0 +1,13 @@
|
||||
package cn.ac.iie;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
public class ArrayTest {
|
||||
public static void main(String[] args) {
|
||||
long[] longs = {1, 2, 3, 4, 5, 6, 7};
|
||||
long[] longs1 = new long[7];
|
||||
System.arraycopy(longs, 0, longs1, 1, longs.length - 1);
|
||||
longs1[0] = 8;
|
||||
System.out.println(Arrays.toString(longs1));
|
||||
}
|
||||
}
|
||||
50
IP-learning-graph/src/test/java/cn/ac/iie/IpTest.java
Normal file
50
IP-learning-graph/src/test/java/cn/ac/iie/IpTest.java
Normal file
@@ -0,0 +1,50 @@
|
||||
package cn.ac.iie;
|
||||
|
||||
import cn.ac.iie.config.ApplicationConfig;
|
||||
|
||||
import java.net.Inet6Address;
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
public class IpTest {
|
||||
public static void main(String[] args) throws UnknownHostException {
|
||||
/*
|
||||
String ipStr = "192.168.40.152";
|
||||
Pattern pattern = Pattern.compile("^[\\d]*$");
|
||||
String[] split = ipStr.split("\\.");
|
||||
for (String s:split){
|
||||
System.out.println(s);
|
||||
System.out.println(pattern.matcher(s).matches());
|
||||
}
|
||||
*/
|
||||
// String ip = "17.57.145.7";
|
||||
String ip = "pixel.rubiconproject.com";
|
||||
// String ip = "113.200.17.239";
|
||||
System.out.println(ip.hashCode());
|
||||
int hash = Math.abs(ip.hashCode());
|
||||
int i = hash % ApplicationConfig.THREAD_POOL_NUMBER;
|
||||
System.out.println(i);
|
||||
// String[] ipArr = ipStr.split("\\.");
|
||||
// long ipLong = (Long.valueOf(ipArr[0]) << 24) + (Long.valueOf(ipArr[1]) << 16) + (Long.valueOf(ipArr[2]) << 8) + (Long.valueOf(ipArr[3]));
|
||||
// System.out.println(ipLong);
|
||||
//
|
||||
//
|
||||
// StringBuffer ipBf = new StringBuffer();
|
||||
// ipBf.append(ipLong >>> 24).append(".");
|
||||
// ipBf.append((ipLong >>> 16) & 0xFF).append(".");
|
||||
// ipBf.append((ipLong >>> 8) & 0xFF).append(".");
|
||||
// ipBf.append(ipLong & 0xFF);
|
||||
// String ip = ipBf.toString();
|
||||
// System.out.println(ip);
|
||||
//
|
||||
// System.out.println("---------------");
|
||||
// InetAddress byName = InetAddress.getByName("2001:470:19:790::38");
|
||||
// byte[] address = byName.getAddress();
|
||||
// for (byte b : address) {
|
||||
// System.out.println(b & 0xFF);
|
||||
// }
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
81
IP-learning-graph/src/test/java/cn/ac/iie/TestMap.java
Normal file
81
IP-learning-graph/src/test/java/cn/ac/iie/TestMap.java
Normal file
@@ -0,0 +1,81 @@
|
||||
package cn.ac.iie;
|
||||
|
||||
import cn.ac.iie.config.ApplicationConfig;
|
||||
import cn.ac.iie.etl.UpdateEFqdnAddressIp;
|
||||
import cn.ac.iie.utils.ArangoDBConnect;
|
||||
import cn.ac.iie.utils.ClickhouseConnect;
|
||||
import com.alibaba.druid.pool.DruidPooledConnection;
|
||||
import com.arangodb.ArangoCollection;
|
||||
import com.arangodb.ArangoDatabase;
|
||||
import com.arangodb.entity.*;
|
||||
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.Statement;
|
||||
import java.util.*;
|
||||
|
||||
public class TestMap {
|
||||
|
||||
public static void main(String[] args) {
|
||||
/*
|
||||
long maxTime = 1592794800;
|
||||
long minTime = 1590112800;
|
||||
String where = " common_recv_time >= " + minTime + " AND common_recv_time <= " + maxTime+ " AND (common_schema_type = 'HTTP' or common_schema_type = 'SSL')";
|
||||
String sql = "SELECT common_schema_type,http_host,ssl_sni,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME,COUNT(*) as COUNT_TOTAL,groupArray(30)(common_server_ip) as DIST_CIP_RECENT FROM tsg_galaxy_v3.connection_record_log WHERE "+where+" GROUP BY common_schema_type,http_host,ssl_sni";
|
||||
System.out.println(sql);
|
||||
long start = System.currentTimeMillis();
|
||||
try {
|
||||
DruidPooledConnection connection = ClickhouseConnect.getInstance().getConnection();
|
||||
Statement statement = connection.createStatement();
|
||||
ResultSet resultSet = statement.executeQuery(sql);
|
||||
HashMap<String, HashMap<String,Long>> schemaHashMap = new HashMap<>();
|
||||
while (resultSet.next()) {
|
||||
String[] distCipRecents = (String[]) resultSet.getArray("DIST_CIP_RECENT").getArray();
|
||||
for (String s:distCipRecents){
|
||||
System.out.print(s+",");
|
||||
}
|
||||
System.out.println();
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
*/
|
||||
// long[] longs = new long[]{1,2,3,4,5,6,7};
|
||||
/*
|
||||
long[] longs = new long[]{1,2,3,4};
|
||||
long[] longs1 = new long[7];
|
||||
System.arraycopy(longs,0,longs1,1,longs.length-1);
|
||||
longs1[0] = 0;
|
||||
for (long c:longs1){
|
||||
System.out.println(c);
|
||||
}
|
||||
*/
|
||||
|
||||
|
||||
String[] distCipRecents = new String[]{"2.3"};
|
||||
ArrayList<BaseDocument> baseEdgeDocuments = new ArrayList<>();
|
||||
|
||||
BaseDocument newDoc = new BaseDocument();
|
||||
newDoc.setKey("111$#$");
|
||||
// newDoc.setKey("11111");
|
||||
newDoc.addAttribute("FIRST_FOUND_TIME", 123);
|
||||
newDoc.addAttribute("LAST_FOUND_TIME", 123);
|
||||
|
||||
BaseDocument document = new BaseDocument();
|
||||
document.setKey("4399pk.com2142379111");
|
||||
document.addAttribute("FIRST_FOUND_TIME",1592743297);
|
||||
document.addAttribute("LAST_FOUND_TIME",1592743297);
|
||||
|
||||
baseEdgeDocuments.add(newDoc);
|
||||
baseEdgeDocuments.add(document);
|
||||
|
||||
ArangoDBConnect instance = ArangoDBConnect.getInstance();
|
||||
|
||||
instance.overwrite(baseEdgeDocuments,"FQDN");
|
||||
|
||||
ArangoDBConnect.clean();
|
||||
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user