修改全局变量为本地变量

This commit is contained in:
wanglihui
2020-08-21 18:08:58 +08:00
parent 31e19d7a0f
commit db8e764e00
7 changed files with 254 additions and 1 deletions

View File

@@ -39,6 +39,12 @@
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>com.arangodb</groupId>
<artifactId>arangodb-java-driver</artifactId>

View File

@@ -58,6 +58,32 @@ public class BaseArangoData {
}
}
public <T extends BaseDocument> ConcurrentHashMap<Integer,ConcurrentHashMap<String, T>> readHistoryData(String table, Class<T> type){
ConcurrentHashMap<Integer,ConcurrentHashMap<String, T>> map = new ConcurrentHashMap<>();
try {
LOG.info("开始更新"+table);
long start = System.currentTimeMillis();
for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++){
map.put(i,new ConcurrentHashMap<>());
}
CountDownLatch countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER);
Long countTotal = getCountTotal(table);
for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) {
String sql = getQuerySql(countTotal, i, table);
ReadHistoryArangoData<T> readHistoryArangoData =
new ReadHistoryArangoData<>(arangoDBConnect, sql, map,type,table,countDownLatch);
threadPool.executor(readHistoryArangoData);
}
countDownLatch.await();
long last = System.currentTimeMillis();
LOG.info("读取"+table+" arangoDB 共耗时:"+(last-start));
}catch (Exception e){
e.printStackTrace();
LOG.error("读取历史数据失败 "+e.toString());
}
return map;
}
private long[] getTimeRange(String table){
long minTime = 0L;
long maxTime = 0L;

View File

@@ -73,5 +73,51 @@ public class BaseClickhouseData {
}
}
public <T extends BaseDocument> HashMap<Integer, HashMap<String,ArrayList<T>>> baseDocFromCk(Supplier<String> getSqlSupplier,
Function<ResultSet,T> formatResultFunc){
long start = System.currentTimeMillis();
HashMap<Integer, HashMap<String, ArrayList<T>>> newDataMap = initializeMap();
if (newDataMap == null){
return null;
}
String sql = getSqlSupplier.get();
try {
connection = manger.getConnection();
statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(sql);
int i = 0;
while (resultSet.next()) {
T newDoc = formatResultFunc.apply(resultSet);
if (newDoc != null) {
i+=1;
putMapByHashcode(newDoc, newDataMap);
}
}
long last = System.currentTimeMillis();
LOG.info(sql + "\n读取"+i+"条数据,运行时间:" + (last - start));
}catch (Exception e){
e.printStackTrace();
LOG.error("获取原始数据失败 "+e.toString());
}finally {
manger.clear(statement,connection);
}
return newDataMap;
}
private <T extends BaseDocument> HashMap<Integer, HashMap<String,ArrayList<T>>> initializeMap(){
try {
HashMap<Integer, HashMap<String, ArrayList<T>>> newDataMap = new HashMap<>();
for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) {
newDataMap.put(i, new HashMap<>());
}
return newDataMap;
}catch (Exception e){
e.printStackTrace();
LOG.error("数据初始化失败 "+e.toString());
return null;
}
}
}

View File

@@ -75,6 +75,41 @@ public class UpdateGraphData {
}
}
public void updateArango2(){
long start = System.currentTimeMillis();
try {
updateDocument("FQDN", Fqdn.class,BaseDocument.class,
ReadClickhouseData::getVertexFqdnSql,ReadClickhouseData::getVertexFqdnDocument);
updateDocument("IP", Ip.class,BaseDocument.class,
ReadClickhouseData::getVertexIpSql,ReadClickhouseData::getVertexIpDocument);
updateDocument("SUBSCRIBER", Subscriber.class,BaseDocument.class,
ReadClickhouseData::getVertexSubscriberSql,ReadClickhouseData::getVertexSubscriberDocument);
updateDocument("R_LOCATE_FQDN2IP", LocateFqdn2Ip.class,BaseEdgeDocument.class,
ReadClickhouseData::getRelationshipFqdnAddressIpSql,ReadClickhouseData::getRelationFqdnAddressIpDocument);
// updateDocument("R_VISIT_IP2FQDN",
// VisitIp2Fqdn.class,BaseEdgeDocument.class,
// ReadClickhouseData::getRelationshipIpVisitFqdnSql,ReadClickhouseData::getRelationIpVisitFqdnDocument);
updateDocument("R_LOCATE_SUBSCRIBER2IP",
LocateSubscriber2Ip.class,BaseEdgeDocument.class,
ReadClickhouseData::getRelationshipSubsciberLocateIpSql,ReadClickhouseData::getRelationshipSubsciberLocateIpDocument);
long last = System.currentTimeMillis();
LOG.info("iplearning application运行完毕用时"+(last - start));
}catch (Exception e){
e.printStackTrace();
}finally {
arangoManger.clean();
pool.shutdown();
}
}
private <T extends BaseDocument> void updateDocument(HashMap<Integer, HashMap<String, ArrayList<T>>> newMap,
ConcurrentHashMap<Integer, ConcurrentHashMap<String, T>> historyMap,
@@ -116,5 +151,45 @@ public class UpdateGraphData {
}
}
private <T extends BaseDocument> void updateDocument(String collection,
Class<? extends Document<T>> taskType,
Class<T> docmentType,
Supplier<String> getSqlSupplier,
Function<ResultSet,T> formatResultFunc){
ConcurrentHashMap<Integer, ConcurrentHashMap<String, T>> historyData = baseArangoData.readHistoryData(collection, docmentType);
LOG.info(collection+" 读取clickhouse,封装结果集");
HashMap<Integer, HashMap<String, ArrayList<T>>> newData = baseClickhouseData.baseDocFromCk(getSqlSupplier, formatResultFunc);
try {
LOG.info(collection+" 开始更新");
long start = System.currentTimeMillis();
CountDownLatch countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER);
for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++){
HashMap<String, ArrayList<T>> tmpNewMap = newData.get(i);
ConcurrentHashMap<String, T> tmpHisMap = historyData.get(i);
Constructor constructor = taskType.getConstructor(
HashMap.class,
ArangoDBConnect.class,
String.class,
ConcurrentHashMap.class,
CountDownLatch.class);
Document docTask = (Document)constructor.newInstance(tmpNewMap, arangoManger, collection, tmpHisMap, countDownLatch);
pool.executor(docTask);
}
countDownLatch.await();
long last = System.currentTimeMillis();
LOG.info(collection+" 更新完毕,共耗时:"+(last-start));
}catch (Exception e){
e.printStackTrace();
LOG.error("更新"+collection+"失败!!"+e.toString());
}finally {
newData.clear();
historyData.clear();
}
}
}

View File

@@ -6,7 +6,8 @@ public class IpLearningApplicationTest {
public static void main(String[] args) {
UpdateGraphData updateGraphData = new UpdateGraphData();
updateGraphData.updateArango();
// updateGraphData.updateArango();
updateGraphData.updateArango2();
}
}

View File

@@ -0,0 +1,53 @@
package cn.ac.iie;
import cn.ac.iie.dao.BaseArangoData;
import cn.ac.iie.utils.ArangoDBConnect;
import cn.ac.iie.utils.ExecutorThreadPool;
import com.arangodb.entity.BaseDocument;
import com.arangodb.entity.BaseEdgeDocument;
import org.junit.After;
import org.junit.Test;
import java.util.Enumeration;
import java.util.concurrent.ConcurrentHashMap;
public class TestReadArango {
private static ExecutorThreadPool pool = ExecutorThreadPool.getInstance();
private static ArangoDBConnect arangoManger = ArangoDBConnect.getInstance();
private static BaseArangoData baseArangoData = new BaseArangoData();
@Test
public void testReadFqdnFromArango() {
ConcurrentHashMap<Integer, ConcurrentHashMap<String, BaseDocument>> historyData =
baseArangoData.readHistoryData("FQDN", BaseDocument.class);
printMap(historyData);
}
@Test
public void testReadFqdnLocIpFromArango() {
ConcurrentHashMap<Integer, ConcurrentHashMap<String, BaseEdgeDocument>> ip =
baseArangoData.readHistoryData("R_LOCATE_FQDN2IP", BaseEdgeDocument.class);
printMap(ip);
}
private <T extends BaseDocument> void printMap(ConcurrentHashMap<Integer, ConcurrentHashMap<String, T>> historyData) {
ConcurrentHashMap<String, T> map = historyData.get(2);
Enumeration<String> keys = map.keys();
while (keys.hasMoreElements()) {
String key = keys.nextElement();
T document = map.get(key);
System.out.println(document.toString());
}
}
@After
public void clearSource() {
pool.shutdown();
arangoManger.clean();
}
}

View File

@@ -0,0 +1,46 @@
package cn.ac.iie;
import cn.ac.iie.dao.BaseClickhouseData;
import cn.ac.iie.service.ingestion.ReadClickhouseData;
import com.arangodb.entity.BaseDocument;
import com.arangodb.entity.BaseEdgeDocument;
import org.junit.Test;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Set;
public class TestReadClickhouse {
private static BaseClickhouseData baseClickhouseData = new BaseClickhouseData();
@Test
public void testReadFqdnFromCk(){
HashMap<Integer, HashMap<String, ArrayList<BaseDocument>>> newData =
baseClickhouseData.baseDocFromCk(ReadClickhouseData::getVertexFqdnSql,
ReadClickhouseData::getVertexFqdnDocument);
printMap(newData);
}
@Test
public void testReadFqdnLocIpFromCk(){
HashMap<Integer, HashMap<String, ArrayList<BaseEdgeDocument>>> map =
baseClickhouseData.baseDocFromCk(ReadClickhouseData::getRelationshipFqdnAddressIpSql,
ReadClickhouseData::getRelationFqdnAddressIpDocument);
printMap(map);
}
private<T extends BaseDocument> void printMap(HashMap<Integer, HashMap<String, ArrayList<T>>> newData){
HashMap<String, ArrayList<T>> map = newData.get(1);
Set<String> strings = map.keySet();
for (String key:strings){
ArrayList<T> baseDocuments = map.get(key);
System.out.println(baseDocuments.get(0));
}
}
}