抽取读取clickhouse公共方法,修改merge函数接口。

This commit is contained in:
wanglihui
2020-07-23 19:09:12 +08:00
parent b3ec0bdfbf
commit 86ecd1b5df
16 changed files with 387 additions and 544 deletions

View File

@@ -19,20 +19,25 @@ import java.util.concurrent.CountDownLatch;
public class BaseArangoData {
private static final Logger LOG = LoggerFactory.getLogger(BaseArangoData.class);
public static ConcurrentHashMap<String, BaseDocument> historyVertexFqdnMap = new ConcurrentHashMap<>();
public static ConcurrentHashMap<String, BaseDocument> historyVertexIpMap = new ConcurrentHashMap<>();
public static ConcurrentHashMap<String, BaseDocument> historyVertexSubscriberMap = new ConcurrentHashMap<>();
public static ConcurrentHashMap<String, BaseEdgeDocument> historyRelationFqdnAddressIpMap = new ConcurrentHashMap<>();
public static ConcurrentHashMap<String, BaseEdgeDocument> historyRelationIpVisitFqdnMap = new ConcurrentHashMap<>();
public static ConcurrentHashMap<String, BaseEdgeDocument> historyRelationSubsciberLocateIpMap = new ConcurrentHashMap<>();
static ConcurrentHashMap<Integer,ConcurrentHashMap<String, BaseDocument>> historyVertexFqdnMap = new ConcurrentHashMap<>();
static ConcurrentHashMap<Integer,ConcurrentHashMap<String, BaseDocument>> historyVertexIpMap = new ConcurrentHashMap<>();
static ConcurrentHashMap<Integer,ConcurrentHashMap<String, BaseDocument>> historyVertexSubscriberMap = new ConcurrentHashMap<>();
static ConcurrentHashMap<Integer,ConcurrentHashMap<String, BaseEdgeDocument>> historyRelationFqdnAddressIpMap = new ConcurrentHashMap<>();
static ConcurrentHashMap<Integer,ConcurrentHashMap<String, BaseEdgeDocument>> historyRelationIpVisitFqdnMap = new ConcurrentHashMap<>();
static ConcurrentHashMap<Integer,ConcurrentHashMap<String, BaseEdgeDocument>> historyRelationFqdnSameFqdnMap = new ConcurrentHashMap<>();
static ConcurrentHashMap<Integer,ConcurrentHashMap<String, BaseEdgeDocument>> historyRelationSubsciberLocateIpMap = new ConcurrentHashMap<>();
private static ArangoDBConnect arangoDBConnect = ArangoDBConnect.getInstance();
private ExecutorThreadPool threadPool = ExecutorThreadPool.getInstance();
<T extends BaseDocument> void readHistoryData(String table, ConcurrentHashMap<String, T> map, Class<T> type){
<T extends BaseDocument> void readHistoryData(String table,ConcurrentHashMap<Integer,ConcurrentHashMap<String, T>> map,Class<T> type){
try {
LOG.info("开始更新"+table);
long start = System.currentTimeMillis();
for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++){
map.put(i,new ConcurrentHashMap<>());
}
CountDownLatch countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER);
long[] timeRange = getTimeRange(table);
for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) {
@@ -43,7 +48,6 @@ public class BaseArangoData {
countDownLatch.await();
long last = System.currentTimeMillis();
LOG.info("读取"+table+" arangoDB 共耗时:"+(last-start));
LOG.info(table+" history Map大小为"+map.size());
}catch (Exception e){
e.printStackTrace();
}