diff --git a/ip-learning-java-test/.gitignore b/ip-learning-java-test/.gitignore index ac0cafa..5db5dd3 100644 --- a/ip-learning-java-test/.gitignore +++ b/ip-learning-java-test/.gitignore @@ -6,3 +6,4 @@ .idea *.iml target +logs/ diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/config/ApplicationConfig.java b/ip-learning-java-test/src/main/java/cn/ac/iie/config/ApplicationConfig.java index d559d07..6a867ee 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/config/ApplicationConfig.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/config/ApplicationConfig.java @@ -21,5 +21,11 @@ public class ApplicationConfig { public static final Long READ_CLICKHOUSE_MAX_TIME = ConfigUtils.getLongProperty("read.clickhouse.max.time"); public static final Long READ_CLICKHOUSE_MIN_TIME = ConfigUtils.getLongProperty("read.clickhouse.min.time"); + public static final Integer TIME_LIMIT_TYPE = ConfigUtils.getIntProperty("time.limit.type"); + public static final Integer UPDATE_INTERVAL = ConfigUtils.getIntProperty("update.interval"); + + public static final Integer DISTINCT_CLIENT_IP_NUM = ConfigUtils.getIntProperty("distinct.client.ip.num"); + public static final Integer RECENT_COUNT_HOUR = ConfigUtils.getIntProperty("recent.count.hour"); + } diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseArangoData.java b/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseArangoData.java index fc7bf83..21638be 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseArangoData.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseArangoData.java @@ -15,25 +15,30 @@ import java.util.concurrent.CountDownLatch; /** * 获取arangoDB历史数据 + * @author wlh */ public class BaseArangoData { private static final Logger LOG = LoggerFactory.getLogger(BaseArangoData.class); - public static ConcurrentHashMap historyVertexFqdnMap = new ConcurrentHashMap<>(); - public static ConcurrentHashMap historyVertexIpMap = new ConcurrentHashMap<>(); - public static ConcurrentHashMap historyVertexSubscriberMap = new ConcurrentHashMap<>(); - public static ConcurrentHashMap historyRelationFqdnAddressIpMap = new ConcurrentHashMap<>(); - public static ConcurrentHashMap historyRelationIpVisitFqdnMap = new ConcurrentHashMap<>(); - public static ConcurrentHashMap historyRelationFqdnSameFqdnMap = new ConcurrentHashMap<>(); - public static ConcurrentHashMap historyRelationSubsciberLocateIpMap = new ConcurrentHashMap<>(); + static ConcurrentHashMap> historyVertexFqdnMap = new ConcurrentHashMap<>(); + static ConcurrentHashMap> historyVertexIpMap = new ConcurrentHashMap<>(); + static ConcurrentHashMap> historyVertexSubscriberMap = new ConcurrentHashMap<>(); + static ConcurrentHashMap> historyRelationFqdnAddressIpMap = new ConcurrentHashMap<>(); + static ConcurrentHashMap> historyRelationIpVisitFqdnMap = new ConcurrentHashMap<>(); + static ConcurrentHashMap> historyRelationFqdnSameFqdnMap = new ConcurrentHashMap<>(); + static ConcurrentHashMap> historyRelationSubsciberLocateIpMap = new ConcurrentHashMap<>(); private static ArangoDBConnect arangoDBConnect = ArangoDBConnect.getInstance(); private ExecutorThreadPool threadPool = ExecutorThreadPool.getInstance(); - void readHistoryData(String table, ConcurrentHashMap map, Class type){ + void readHistoryData(String table, ConcurrentHashMap> map, Class type){ try { + LOG.info("开始更新"+table); long start = System.currentTimeMillis(); + for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++){ + map.put(i,new ConcurrentHashMap<>()); + } CountDownLatch countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); long[] timeRange = getTimeRange(table); for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { @@ -44,7 +49,6 @@ public class BaseArangoData { countDownLatch.await(); long last = System.currentTimeMillis(); LOG.info("读取"+table+" arangoDB 共耗时:"+(last-start)); - LOG.info(table+" history Map大小为:"+map.size()); }catch (Exception e){ e.printStackTrace(); } @@ -81,7 +85,7 @@ public class BaseArangoData { long diffTime = (maxTime - minTime) / ApplicationConfig.THREAD_POOL_NUMBER; long maxThreadTime = minTime + (threadNumber + 1)* diffTime; long minThreadTime = minTime + threadNumber * diffTime; - return "FOR doc IN "+table+" filter doc.FIRST_FOUND_TIME >= "+minThreadTime+" and doc.FIRST_FOUND_TIME <= "+maxThreadTime+" RETURN doc"; + return "FOR doc IN "+table+" filter doc.FIRST_FOUND_TIME >= "+minThreadTime+" and doc.FIRST_FOUND_TIME <= "+maxThreadTime+" limit 100 RETURN doc"; } } diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java b/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java index 49fde14..dfa8a5e 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java @@ -12,8 +12,10 @@ import java.sql.ResultSet; import java.sql.Statement; import java.util.ArrayList; import java.util.HashMap; +import java.util.function.Function; +import java.util.function.Supplier; -import static cn.ac.iie.service.read.ReadClickhouseData.*; +import static cn.ac.iie.service.read.ReadClickhouseData.putMapByHashcode; /** * 读取clickhouse数据,封装到map @@ -22,7 +24,6 @@ import static cn.ac.iie.service.read.ReadClickhouseData.*; public class BaseClickhouseData { private static final Logger LOG = LoggerFactory.getLogger(BaseClickhouseData.class); - private static ClickhouseConnect manger = ClickhouseConnect.getInstance(); static HashMap>> newVertexFqdnMap = new HashMap<>(); static HashMap>> newVertexIpMap = new HashMap<>(); static HashMap>> newVertexSubscriberMap = new HashMap<>(); @@ -31,139 +32,28 @@ public class BaseClickhouseData { static HashMap>> newRelationSubsciberLocateIpMap = new HashMap<>(); static HashMap>> newRelationFqdnSameFqdnMap = new HashMap<>(); + private static ClickhouseConnect manger = ClickhouseConnect.getInstance(); private DruidPooledConnection connection; private Statement statement; - void baseVertexFqdn() { - initializeMap(newVertexFqdnMap); - LOG.info("FQDN resultMap初始化完成"); - String sql = getVertexFqdnSql(); + void baseDocumentFromClickhouse(HashMap>> newMap, Supplier getSqlSupplier, Function formatResultFunc){ long start = System.currentTimeMillis(); + initializeMap(newMap); + String sql = getSqlSupplier.get(); try { connection = manger.getConnection(); statement = connection.createStatement(); ResultSet resultSet = statement.executeQuery(sql); + int i = 0; while (resultSet.next()) { - BaseDocument newDoc = getVertexFqdnDocument(resultSet); + T newDoc = formatResultFunc.apply(resultSet); if (newDoc != null) { - putMapByHashcode(newDoc, newVertexFqdnMap); + i+=1; + putMapByHashcode(newDoc, newMap); } } long last = System.currentTimeMillis(); - LOG.info(sql + "\n读取clickhouse v_FQDN时间:" + (last - start)); - } catch (Exception e) { - LOG.error(e.toString()); - }finally { - manger.clear(statement,connection); - } - } - - void baseVertexIp() { - initializeMap(newVertexIpMap); - LOG.info("IP resultMap初始化完成"); - String sql = getVertexIpSql(); - long start = System.currentTimeMillis(); - try { - connection = manger.getConnection(); - statement = connection.createStatement(); - ResultSet resultSet = statement.executeQuery(sql); - while (resultSet.next()) { - BaseDocument newDoc = getVertexIpDocument(resultSet); - putMapByHashcode(newDoc, newVertexIpMap); - } - long last = System.currentTimeMillis(); - LOG.info(sql + "\n读取clickhouse v_IP时间:" + (last - start)); - } catch (Exception e) { - LOG.error(e.toString()); - }finally { - manger.clear(statement,connection); - } - } - - void baseVertexSubscriber(){ - initializeMap(newVertexSubscriberMap); - LOG.info("SUBSCRIBER resultMap初始化完成"); - String sql = getVertexSubscriberSql(); - long start = System.currentTimeMillis(); - try { - connection = manger.getConnection(); - statement = connection.createStatement(); - ResultSet resultSet = statement.executeQuery(sql); - while (resultSet.next()){ - BaseDocument newDoc = getVertexSubscriberDocument(resultSet); - putMapByHashcode(newDoc, newVertexSubscriberMap); - } - long last = System.currentTimeMillis(); - LOG.info(sql + "\n读取clickhouse v_SUBSCRIBER时间:" + (last - start)); - }catch (Exception e){ - LOG.error(sql + "\n读取clickhouse v_SUBSCRIBER失败"); - e.printStackTrace(); - }finally { - manger.clear(statement,connection); - } - } - - void baseRelationshipSubscriberLocateIp(){ - initializeMap(newRelationSubsciberLocateIpMap); - LOG.info("R_LOCATE_SUBSCRIBER2IP"); - String sql = getRelationshipSubsciberLocateIpSql(); - long start = System.currentTimeMillis(); - try { - connection = manger.getConnection(); - statement = connection.createStatement(); - ResultSet resultSet = statement.executeQuery(sql); - while (resultSet.next()){ - BaseEdgeDocument newDoc = getRelationshipSubsciberLocateIpDocument(resultSet); - putMapByHashcode(newDoc, newRelationSubsciberLocateIpMap); - } - long last = System.currentTimeMillis(); - LOG.info(sql + "\n读取clickhouse ESubsciberLocateIp时间:" + (last - start)); - }catch (Exception e){ - LOG.error(sql + "\n读取clickhouse ESubsciberLocateIp失败"); - e.printStackTrace(); - }finally { - manger.clear(statement,connection); - } - } - - void baseRelationshipFqdnAddressIp() { - initializeMap(newRelationFqdnAddressIpMap); - LOG.info("R_LOCATE_FQDN2IP resultMap初始化完成"); - String sql = getRelationshipFqdnAddressIpSql(); - long start = System.currentTimeMillis(); - try { - connection = manger.getConnection(); - statement = connection.createStatement(); - ResultSet resultSet = statement.executeQuery(sql); - - while (resultSet.next()) { - BaseEdgeDocument newDoc = getRelationFqdnAddressIpDocument(resultSet); - putMapByHashcode(newDoc, newRelationFqdnAddressIpMap); - } - long last = System.currentTimeMillis(); - LOG.info(sql + "\n读取clickhouse EFqdnAddressIp时间:" + (last - start)); - } catch (Exception e) { - LOG.error(e.toString()); - }finally { - manger.clear(statement,connection); - } - } - - void baseRelationshipFqdnSameFqdn(){ - initializeMap(newRelationFqdnSameFqdnMap); - LOG.info("R_SAME_ORIGIN_FQDN2FQDN resultMap初始化完成"); - String sql = getRelationshipFqdnSameFqdnSql(); - long start = System.currentTimeMillis(); - try { - connection = manger.getConnection(); - statement = connection.createStatement(); - ResultSet resultSet = statement.executeQuery(sql); - while (resultSet.next()) { - BaseEdgeDocument newDoc = getRelationshipFqdnSameFqdnDocument(resultSet); - putMapByHashcode(newDoc, newRelationFqdnSameFqdnMap); - } - long last = System.currentTimeMillis(); - LOG.info(sql + "\n读取clickhouse EIpVisitFqdn时间:" + (last - start)); + LOG.info(sql + "\n读取"+i+"条数据,运行时间:" + (last - start)); }catch (Exception e){ e.printStackTrace(); }finally { @@ -171,32 +61,10 @@ public class BaseClickhouseData { } } - void baseRelationshipIpVisitFqdn() { - initializeMap(newRelationIpVisitFqdnMap); - LOG.info("R_VISIT_IP2FQDN resultMap初始化完成"); - String sql = getRelationshipIpVisitFqdnSql(); - long start = System.currentTimeMillis(); - try { - connection = manger.getConnection(); - statement = connection.createStatement(); - ResultSet resultSet = statement.executeQuery(sql); - while (resultSet.next()) { - BaseEdgeDocument newDoc = getRelationIpVisitFqdnDocument(resultSet); - putMapByHashcode(newDoc, newRelationIpVisitFqdnMap); - } - long last = System.currentTimeMillis(); - LOG.info(sql + "\n读取clickhouse EIpVisitFqdn时间:" + (last - start)); - } catch (Exception e) { - LOG.error(e.toString()); - }finally { - manger.clear(statement,connection); - } - } - private void initializeMap(HashMap>> map){ try { for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { - map.put(i, new HashMap<>()); + map.put(i, new HashMap<>(16)); } }catch (Exception e){ e.printStackTrace(); diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/dao/UpdateGraphData.java b/ip-learning-java-test/src/main/java/cn/ac/iie/dao/UpdateGraphData.java index da4c1a2..18f7e85 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/dao/UpdateGraphData.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/dao/UpdateGraphData.java @@ -1,9 +1,11 @@ package cn.ac.iie.dao; import cn.ac.iie.config.ApplicationConfig; +import cn.ac.iie.service.read.ReadClickhouseData; import cn.ac.iie.service.update.Document; import cn.ac.iie.service.update.relationship.LocateFqdn2Ip; import cn.ac.iie.service.update.relationship.LocateSubscriber2Ip; +import cn.ac.iie.service.update.relationship.SameFqdn2Fqdn; import cn.ac.iie.service.update.relationship.VisitIp2Fqdn; import cn.ac.iie.service.update.vertex.Fqdn; import cn.ac.iie.service.update.vertex.Ip; @@ -15,9 +17,14 @@ import com.arangodb.entity.BaseEdgeDocument; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.reflect.Constructor; +import java.sql.ResultSet; import java.util.ArrayList; import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.function.Function; +import java.util.function.Supplier; import static cn.ac.iie.dao.BaseArangoData.*; import static cn.ac.iie.dao.BaseClickhouseData.*; @@ -31,34 +38,32 @@ public class UpdateGraphData { private static ExecutorThreadPool pool = ExecutorThreadPool.getInstance(); private static ArangoDBConnect arangoManger = ArangoDBConnect.getInstance(); + private static BaseArangoData baseArangoData = new BaseArangoData(); private static BaseClickhouseData baseClickhouseData = new BaseClickhouseData(); - private CountDownLatch countDownLatch; public void updateArango(){ long start = System.currentTimeMillis(); try { - BaseArangoData baseArangoData = new BaseArangoData(); + updateDocument(newVertexFqdnMap, historyVertexFqdnMap, "FQDN", + Fqdn.class,BaseDocument.class, + ReadClickhouseData::getVertexFqdnSql,ReadClickhouseData::getVertexFqdnDocument); - baseArangoData.readHistoryData("FQDN", historyVertexFqdnMap,BaseDocument.class); - updateVertexFqdn(); + updateDocument(newVertexIpMap,historyVertexIpMap,"IP", + Ip.class,BaseDocument.class, + ReadClickhouseData::getVertexIpSql,ReadClickhouseData::getVertexIpDocument); - baseArangoData.readHistoryData("IP", historyVertexIpMap,BaseDocument.class); - updateVertexIp(); + updateDocument(newRelationFqdnAddressIpMap,historyRelationFqdnAddressIpMap,"R_LOCATE_FQDN2IP", + LocateFqdn2Ip.class,BaseEdgeDocument.class, + ReadClickhouseData::getRelationshipFqdnAddressIpSql,ReadClickhouseData::getRelationFqdnAddressIpDocument); -// baseArangoData.readHistoryData("SUBSCRIBER", historyVertexSubscriberMap,BaseDocument.class); -// updateVertexSubscriber(); + updateDocument(newRelationIpVisitFqdnMap,historyRelationIpVisitFqdnMap,"R_VISIT_IP2FQDN", + VisitIp2Fqdn.class,BaseEdgeDocument.class, + ReadClickhouseData::getRelationshipIpVisitFqdnSql,ReadClickhouseData::getRelationIpVisitFqdnDocument); - baseArangoData.readHistoryData("R_LOCATE_FQDN2IP", historyRelationFqdnAddressIpMap,BaseEdgeDocument.class); - updateRelationFqdnAddressIp(); + updateDocument(newRelationFqdnSameFqdnMap,historyRelationFqdnSameFqdnMap,"R_SAME_ORIGIN_FQDN2FQDN", + SameFqdn2Fqdn.class,BaseEdgeDocument.class, + ReadClickhouseData::getRelationshipFqdnSameFqdnSql,ReadClickhouseData::getRelationshipFqdnSameFqdnDocument); - baseArangoData.readHistoryData("R_VISIT_IP2FQDN", historyRelationIpVisitFqdnMap,BaseEdgeDocument.class); - updateRelationIpVisitFqdn(); - - baseArangoData.readHistoryData("R_SAME_ORIGIN_FQDN2FQDN",historyRelationFqdnSameFqdnMap,BaseEdgeDocument.class); - updateRelationFqdnSameFqdn(); - -// baseArangoData.readHistoryData("R_LOCATE_SUBSCRIBER2IP", historyRelationSubsciberLocateIpMap,BaseEdgeDocument.class); -// updateRelationshipSubsciberLocateIp(); long last = System.currentTimeMillis(); LOG.info("更新图数据库时间共计:"+(last - start)); @@ -70,151 +75,44 @@ public class UpdateGraphData { } } - private void updateVertexFqdn(){ + private void updateDocument(HashMap>> newMap, + ConcurrentHashMap> historyMap, + String collection, + Class> taskType, + Class docmentType, + Supplier getSqlSupplier, + Function formatResultFunc) { try { + + baseArangoData.readHistoryData(collection,historyMap,docmentType); + LOG.info(collection+" 读取clickhouse,封装结果集"); + baseClickhouseData.baseDocumentFromClickhouse(newMap, getSqlSupplier,formatResultFunc); + + LOG.info(collection+" 开始更新"); long start = System.currentTimeMillis(); - baseClickhouseData.baseVertexFqdn(); - countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); - for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { - HashMap> tmpMap = newVertexFqdnMap.get(i); - Document updateFqdn = new Fqdn(tmpMap, arangoManger, "FQDN", historyVertexFqdnMap,countDownLatch); - pool.executor(updateFqdn); + CountDownLatch countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); + for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++){ + HashMap> tmpNewMap = newMap.get(i); + ConcurrentHashMap tmpHisMap = historyMap.get(i); + Constructor constructor = taskType.getConstructor( + HashMap.class, + ArangoDBConnect.class, + String.class, + ConcurrentHashMap.class, + CountDownLatch.class); + Document docTask = (Document)constructor.newInstance(tmpNewMap, arangoManger, collection, tmpHisMap, countDownLatch); + pool.executor(docTask); } countDownLatch.await(); long last = System.currentTimeMillis(); - LOG.info("FQDN vertex 更新完毕,共耗时:"+(last-start)); + LOG.info(collection+" 更新完毕,共耗时:"+(last-start)); }catch (Exception e){ e.printStackTrace(); }finally { - historyVertexFqdnMap.clear(); - newVertexFqdnMap.clear(); + newMap.clear(); + historyMap.clear(); } } - private void updateVertexSubscriber(){ - try { - long start = System.currentTimeMillis(); - baseClickhouseData.baseVertexSubscriber(); - countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); - for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { - HashMap> tmpMap = newVertexSubscriberMap.get(i); - Subscriber updateSubscriber = new Subscriber(tmpMap, arangoManger, "SUBSCRIBER", historyVertexSubscriberMap,countDownLatch); - pool.executor(updateSubscriber); - } - countDownLatch.await(); - long last = System.currentTimeMillis(); - LOG.info("SUBSCRIBER vertex 更新完毕,共耗时:"+(last-start)); - }catch (Exception e){ - e.printStackTrace(); - }finally { - historyVertexSubscriberMap.clear(); - newVertexSubscriberMap.clear(); - } - } - - private void updateRelationshipSubsciberLocateIp(){ - try { - long start = System.currentTimeMillis(); - baseClickhouseData.baseRelationshipSubscriberLocateIp(); - countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); - for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { - HashMap> tmpMap = newRelationSubsciberLocateIpMap.get(i); - LocateSubscriber2Ip locateSubscriber2Ip = new LocateSubscriber2Ip(tmpMap, arangoManger, "R_LOCATE_SUBSCRIBER2IP", historyRelationSubsciberLocateIpMap, countDownLatch); - pool.executor(locateSubscriber2Ip); - } - countDownLatch.await(); - long last = System.currentTimeMillis(); - LOG.info("R_LOCATE_SUBSCRIBER2IP relationship 更新完毕,共耗时:"+(last-start)); - }catch (Exception e){ - e.printStackTrace(); - }finally { - historyRelationSubsciberLocateIpMap.clear(); - newRelationSubsciberLocateIpMap.clear(); - } - } - - private void updateVertexIp(){ - try { - long start = System.currentTimeMillis(); - baseClickhouseData.baseVertexIp(); - countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); - for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { - HashMap> tmpMap = newVertexIpMap.get(i); - Ip updateIp = new Ip(tmpMap, arangoManger, "IP", historyVertexIpMap, countDownLatch); - pool.executor(updateIp); - } - countDownLatch.await(); - long last = System.currentTimeMillis(); - LOG.info("IP vertex 更新完毕,共耗时:"+(last-start)); - }catch (Exception e){ - e.printStackTrace(); - }finally { - historyVertexIpMap.clear(); - newVertexIpMap.clear(); - } - } - - private void updateRelationFqdnAddressIp(){ - try { - long start = System.currentTimeMillis(); - baseClickhouseData.baseRelationshipFqdnAddressIp(); - countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); - for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { - HashMap> tmpMap = newRelationFqdnAddressIpMap.get(i); - LocateFqdn2Ip fqdnAddressIp = new LocateFqdn2Ip(tmpMap, arangoManger, "R_LOCATE_FQDN2IP", historyRelationFqdnAddressIpMap, countDownLatch); - pool.executor(fqdnAddressIp); - } - countDownLatch.await(); - long last = System.currentTimeMillis(); - LOG.info("R_LOCATE_FQDN2IP relationship 更新完毕,共耗时:"+(last-start)); - }catch (Exception e){ - e.printStackTrace(); - }finally { - historyRelationFqdnAddressIpMap.clear(); - newRelationFqdnAddressIpMap.clear(); - } - } - - private void updateRelationIpVisitFqdn(){ - try { - long start = System.currentTimeMillis(); - baseClickhouseData.baseRelationshipIpVisitFqdn(); - countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); - for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { - HashMap> tmpMap = newRelationIpVisitFqdnMap.get(i); - VisitIp2Fqdn ipVisitFqdn = new VisitIp2Fqdn(tmpMap,arangoManger,"R_VISIT_IP2FQDN", historyRelationIpVisitFqdnMap,countDownLatch); - pool.executor(ipVisitFqdn); - } - countDownLatch.await(); - long last = System.currentTimeMillis(); - LOG.info("R_VISIT_IP2FQDN ralationship 更新完毕,共耗时:"+(last-start)); - }catch (Exception e){ - e.printStackTrace(); - }finally { - historyRelationIpVisitFqdnMap.clear(); - newRelationIpVisitFqdnMap.clear(); - } - } - - private void updateRelationFqdnSameFqdn(){ - try { - long start = System.currentTimeMillis(); - baseClickhouseData.baseRelationshipFqdnSameFqdn(); - countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); - for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { - HashMap> tmpMap = newRelationFqdnSameFqdnMap.get(i); - VisitIp2Fqdn ipVisitFqdn = new VisitIp2Fqdn(tmpMap,arangoManger,"R_SAME_ORIGIN_FQDN2FQDN", historyRelationFqdnSameFqdnMap,countDownLatch); - pool.executor(ipVisitFqdn); - } - countDownLatch.await(); - long last = System.currentTimeMillis(); - LOG.info("R_SAME_ORIGIN_FQDN2FQDN ralationship 更新完毕,共耗时:"+(last-start)); - }catch (Exception e){ - e.printStackTrace(); - }finally { - historyRelationFqdnSameFqdnMap.clear(); - newRelationFqdnSameFqdnMap.clear(); - } - } } diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/read/ReadClickhouseData.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/read/ReadClickhouseData.java index fdc8f86..4918963 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/read/ReadClickhouseData.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/read/ReadClickhouseData.java @@ -8,7 +8,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.ResultSet; -import java.sql.SQLException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -25,8 +24,9 @@ public class ReadClickhouseData { private static final Logger LOG = LoggerFactory.getLogger(ReadClickhouseData.class); - public static final Integer DISTINCT_CLIENT_IP_NUM = 100; - public static final Integer RECENT_COUNT_HOUR = 24; + private static long[] timeLimit = getTimeLimit(); + public static final Integer DISTINCT_CLIENT_IP_NUM = ApplicationConfig.DISTINCT_CLIENT_IP_NUM; + public static final Integer RECENT_COUNT_HOUR = ApplicationConfig.RECENT_COUNT_HOUR; public static final HashSet PROTOCOL_SET; static { @@ -36,156 +36,183 @@ public class ReadClickhouseData { PROTOCOL_SET.add("DNS"); } - public static BaseDocument getVertexFqdnDocument(ResultSet resultSet) throws SQLException { - String fqdnOrReferer = resultSet.getString("FQDN"); - String fqdnName = TopDomainUtils.getDomainFromUrl(fqdnOrReferer); + public static BaseDocument getVertexFqdnDocument(ResultSet resultSet){ BaseDocument newDoc = null; - if (isDomain(fqdnName)) { - long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); - long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); - newDoc = new BaseDocument(); - newDoc.setKey(fqdnName); - newDoc.addAttribute("FQDN_NAME", fqdnName); - newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); - newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); - } - return newDoc; - } - - public static BaseDocument getVertexIpDocument(ResultSet resultSet) throws SQLException { - BaseDocument newDoc = new BaseDocument(); - String ip = resultSet.getString("IP"); - long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); - long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); - long sessionCount = resultSet.getLong("SESSION_COUNT"); - long bytesSum = resultSet.getLong("BYTES_SUM"); - String ipType = resultSet.getString("ip_type"); - newDoc.setKey(ip); - newDoc.addAttribute("IP", ip); - newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); - newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); - switch (ipType) { - case "client": - newDoc.addAttribute("CLIENT_SESSION_COUNT", sessionCount); - newDoc.addAttribute("CLIENT_BYTES_SUM", bytesSum); - newDoc.addAttribute("SERVER_SESSION_COUNT", 0L); - newDoc.addAttribute("SERVER_BYTES_SUM", 0L); - break; - case "server": - newDoc.addAttribute("SERVER_SESSION_COUNT", sessionCount); - newDoc.addAttribute("SERVER_BYTES_SUM", bytesSum); - newDoc.addAttribute("CLIENT_SESSION_COUNT", 0L); - newDoc.addAttribute("CLIENT_BYTES_SUM", 0L); - break; - default: - newDoc.addAttribute("SERVER_SESSION_COUNT", 0L); - newDoc.addAttribute("SERVER_BYTES_SUM", 0L); - newDoc.addAttribute("CLIENT_SESSION_COUNT", 0L); - newDoc.addAttribute("CLIENT_BYTES_SUM", 0L); - break; - } -// newDoc.addAttribute("COMMON_LINK_INFO", ""); - return newDoc; - } - - public static BaseDocument getVertexSubscriberDocument(ResultSet resultSet) throws SQLException { - String subscriberId = resultSet.getString("common_subscriber_id"); - long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); - long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); - BaseDocument newDoc = new BaseDocument(); - newDoc.setKey(subscriberId); - newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); - newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); - return newDoc; - } - - public static BaseEdgeDocument getRelationshipSubsciberLocateIpDocument(ResultSet resultSet) throws SQLException { - String subscriberId = resultSet.getString("common_subscriber_id"); - String framedIp = resultSet.getString("radius_framed_ip"); - long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); - long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); - long countTotal = resultSet.getLong("COUNT_TOTAL"); - - String key = subscriberId + "-" + framedIp; - BaseEdgeDocument newDoc = new BaseEdgeDocument(); - newDoc.setKey(key); - newDoc.setFrom("SUBSCRIBER/" + subscriberId); - newDoc.setTo("IP/" + framedIp); - newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); - newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); - newDoc.addAttribute("COUNT_TOTAL", countTotal); - - return newDoc; - - } - - public static BaseEdgeDocument getRelationFqdnAddressIpDocument(ResultSet resultSet) throws SQLException { - String vFqdn = resultSet.getString("FQDN"); - BaseEdgeDocument newDoc = null; - if (isDomain(vFqdn)) { - String vIp = resultSet.getString("common_server_ip"); - long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); - long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); - long countTotal = resultSet.getLong("COUNT_TOTAL"); - String[] distCipRecents = (String[]) resultSet.getArray("DIST_CIP_RECENT").getArray(); - long[] clientIpTs = new long[distCipRecents.length]; - for (int i = 0; i < clientIpTs.length; i++) { - clientIpTs[i] = currentHour; + try { + String fqdnOrReferer = resultSet.getString("FQDN"); + String fqdnName = TopDomainUtils.getDomainFromUrl(fqdnOrReferer); + if (isDomain(fqdnName)) { + long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); + long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); + newDoc = new BaseDocument(); + newDoc.setKey(fqdnName); + newDoc.addAttribute("FQDN_NAME", fqdnName); + newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); + newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); } - - String key = vFqdn + "-" + vIp; - newDoc = new BaseEdgeDocument(); - newDoc.setKey(key); - newDoc.setFrom("FQDN/" + vFqdn); - newDoc.setTo("IP/" + vIp); - newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); - newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); - newDoc.addAttribute("CNT_TOTAL",countTotal); - newDoc.addAttribute("DIST_CIP", distCipRecents); - newDoc.addAttribute("DIST_CIP_TS", clientIpTs); - + }catch (Exception e){ + e.printStackTrace(); } return newDoc; } - public static BaseEdgeDocument getRelationshipFqdnSameFqdnDocument(ResultSet resultSet) throws SQLException { - BaseEdgeDocument newDoc = null; - String domainFqdn = resultSet.getString("domainFqdn"); - String referer = resultSet.getString("referer"); - String refererFqdn = TopDomainUtils.getDomainFromUrl(referer); - if (isDomain(refererFqdn) && isDomain(domainFqdn)){ + public static BaseDocument getVertexIpDocument(ResultSet resultSet){ + BaseDocument newDoc = new BaseDocument(); + try { + String ip = resultSet.getString("IP"); long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); - long countTotal = resultSet.getLong("COUNT_TOTAL"); - String key = domainFqdn + "-" + refererFqdn; - newDoc = new BaseEdgeDocument(); - newDoc.setKey(key); - newDoc.setFrom("FQDN/" + domainFqdn); - newDoc.setTo("FQDN/" + refererFqdn); + long sessionCount = resultSet.getLong("SESSION_COUNT"); + long bytesSum = resultSet.getLong("BYTES_SUM"); + String ipType = resultSet.getString("ip_type"); + newDoc.setKey(ip); + newDoc.addAttribute("IP", ip); newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); - newDoc.addAttribute("CNT_TOTAL",countTotal); + switch (ipType) { + case "client": + newDoc.addAttribute("CLIENT_SESSION_COUNT", sessionCount); + newDoc.addAttribute("CLIENT_BYTES_SUM", bytesSum); + newDoc.addAttribute("SERVER_SESSION_COUNT", 0L); + newDoc.addAttribute("SERVER_BYTES_SUM", 0L); + break; + case "server": + newDoc.addAttribute("SERVER_SESSION_COUNT", sessionCount); + newDoc.addAttribute("SERVER_BYTES_SUM", bytesSum); + newDoc.addAttribute("CLIENT_SESSION_COUNT", 0L); + newDoc.addAttribute("CLIENT_BYTES_SUM", 0L); + break; + default: + newDoc.addAttribute("SERVER_SESSION_COUNT", 0L); + newDoc.addAttribute("SERVER_BYTES_SUM", 0L); + newDoc.addAttribute("CLIENT_SESSION_COUNT", 0L); + newDoc.addAttribute("CLIENT_BYTES_SUM", 0L); + break; + } +// newDoc.addAttribute("COMMON_LINK_INFO", ""); + }catch (Exception e){ + e.printStackTrace(); } return newDoc; } - public static BaseEdgeDocument getRelationIpVisitFqdnDocument(ResultSet resultSet) throws SQLException { - BaseEdgeDocument newDoc = null; - String vFqdn = resultSet.getString("FQDN"); - if (isDomain(vFqdn)) { - String vIp = resultSet.getString("common_client_ip"); - String key = vIp + "-" + vFqdn; + public static BaseDocument getVertexSubscriberDocument(ResultSet resultSet){ + BaseDocument newDoc = new BaseDocument(); + try { + String subscriberId = resultSet.getString("common_subscriber_id"); + long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); + long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); + newDoc.setKey(subscriberId); + newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); + newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); + }catch (Exception e){ + e.printStackTrace(); + } + return newDoc; + } + + public static BaseEdgeDocument getRelationshipSubsciberLocateIpDocument(ResultSet resultSet){ + BaseEdgeDocument newDoc = new BaseEdgeDocument(); + try { + String subscriberId = resultSet.getString("common_subscriber_id"); + String framedIp = resultSet.getString("radius_framed_ip"); long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); long countTotal = resultSet.getLong("COUNT_TOTAL"); - newDoc = new BaseEdgeDocument(); + String key = subscriberId + "-" + framedIp; newDoc.setKey(key); - newDoc.setFrom("IP/" + vIp); - newDoc.setTo("FQDN/" + vFqdn); - newDoc.addAttribute("CNT_TOTAL",countTotal); + newDoc.setFrom("SUBSCRIBER/" + subscriberId); + newDoc.setTo("IP/" + framedIp); newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); + newDoc.addAttribute("COUNT_TOTAL", countTotal); + }catch (Exception e){ + e.printStackTrace(); + } + return newDoc; + + } + + public static BaseEdgeDocument getRelationFqdnAddressIpDocument(ResultSet resultSet){ + BaseEdgeDocument newDoc = null; + try { + String vFqdn = resultSet.getString("FQDN"); + if (isDomain(vFqdn)) { + String vIp = resultSet.getString("common_server_ip"); + long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); + long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); + long countTotal = resultSet.getLong("COUNT_TOTAL"); + String[] distCipRecents = (String[]) resultSet.getArray("DIST_CIP_RECENT").getArray(); + long[] clientIpTs = new long[distCipRecents.length]; + for (int i = 0; i < clientIpTs.length; i++) { + clientIpTs[i] = currentHour; + } + + String key = vFqdn + "-" + vIp; + newDoc = new BaseEdgeDocument(); + newDoc.setKey(key); + newDoc.setFrom("FQDN/" + vFqdn); + newDoc.setTo("IP/" + vIp); + newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); + newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); + newDoc.addAttribute("CNT_TOTAL",countTotal); + newDoc.addAttribute("DIST_CIP", distCipRecents); + newDoc.addAttribute("DIST_CIP_TS", clientIpTs); + + } + }catch (Exception e){ + e.printStackTrace(); + } + return newDoc; + } + + public static BaseEdgeDocument getRelationshipFqdnSameFqdnDocument(ResultSet resultSet){ + BaseEdgeDocument newDoc = null; + try { + String domainFqdn = resultSet.getString("domainFqdn"); + String referer = resultSet.getString("referer"); + String refererFqdn = TopDomainUtils.getDomainFromUrl(referer); + if (isDomain(refererFqdn) && isDomain(domainFqdn)){ + long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); + long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); + long countTotal = resultSet.getLong("COUNT_TOTAL"); + String key = domainFqdn + "-" + refererFqdn; + newDoc = new BaseEdgeDocument(); + newDoc.setKey(key); + newDoc.setFrom("FQDN/" + domainFqdn); + newDoc.setTo("FQDN/" + refererFqdn); + newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); + newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); + newDoc.addAttribute("CNT_TOTAL",countTotal); + } + }catch (Exception e){ + e.printStackTrace(); + } + return newDoc; + } + + public static BaseEdgeDocument getRelationIpVisitFqdnDocument(ResultSet resultSet){ + BaseEdgeDocument newDoc = null; + try { + String vFqdn = resultSet.getString("FQDN"); + if (isDomain(vFqdn)) { + String vIp = resultSet.getString("common_client_ip"); + String key = vIp + "-" + vFqdn; + long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); + long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); + long countTotal = resultSet.getLong("COUNT_TOTAL"); + + newDoc = new BaseEdgeDocument(); + newDoc.setKey(key); + newDoc.setFrom("IP/" + vIp); + newDoc.setTo("FQDN/" + vFqdn); + newDoc.addAttribute("CNT_TOTAL",countTotal); + newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime); + newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime); + } + }catch (Exception e){ + e.printStackTrace(); } return newDoc; } @@ -206,6 +233,12 @@ public class ReadClickhouseData { if (fqdn == null || fqdn.length() == 0){ return false; } + if (fqdn.contains(":")){ + String s = fqdn.split(":")[0]; + if (s.contains(":")){ + return false; + } + } String[] fqdnArr = fqdn.split("\\."); if (fqdnArr.length < 4 || fqdnArr.length > 4) { return true; @@ -245,7 +278,6 @@ public class ReadClickhouseData { } public static String getVertexFqdnSql() { - long[] timeLimit = getTimeLimit(); long maxTime = timeLimit[0]; long minTime = timeLimit[1]; String where = "recv_time >= "+minTime+" and recv_time <= "+maxTime; @@ -255,7 +287,6 @@ public class ReadClickhouseData { } public static String getVertexIpSql() { - long[] timeLimit = getTimeLimit(); long maxTime = timeLimit[0]; long minTime = timeLimit[1]; String where = " recv_time >= " + minTime + " AND recv_time < " + maxTime; @@ -265,7 +296,6 @@ public class ReadClickhouseData { } public static String getRelationshipFqdnAddressIpSql() { - long[] timeLimit = getTimeLimit(); long maxTime = timeLimit[0]; long minTime = timeLimit[1]; String where = "recv_time >= "+minTime+" and recv_time <= "+maxTime +" AND s1_domain != '' AND s1_d_ip != '' "; @@ -273,7 +303,6 @@ public class ReadClickhouseData { } public static String getRelationshipFqdnSameFqdnSql(){ - long[] timeLimit = getTimeLimit(); long maxTime = timeLimit[0]; long minTime = timeLimit[1]; String where = "recv_time >= "+minTime+" and recv_time <= "+maxTime +" AND s1_domain != '' AND s1_referer != '' "; @@ -281,7 +310,6 @@ public class ReadClickhouseData { } public static String getRelationshipIpVisitFqdnSql() { - long[] timeLimit = getTimeLimit(); long maxTime = timeLimit[0]; long minTime = timeLimit[1]; String where = "recv_time >= "+minTime+" and recv_time <= "+maxTime+" AND s1_s_ip != '' AND s1_domain != '' "; @@ -289,7 +317,6 @@ public class ReadClickhouseData { } public static String getVertexSubscriberSql() { - long[] timeLimit = getTimeLimit(); long maxTime = timeLimit[0]; long minTime = timeLimit[1]; String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime + " AND common_subscriber_id != '' AND radius_packet_type = 4 AND radius_acct_status_type = 1"; @@ -297,7 +324,6 @@ public class ReadClickhouseData { } public static String getRelationshipSubsciberLocateIpSql() { - long[] timeLimit = getTimeLimit(); long maxTime = timeLimit[0]; long minTime = timeLimit[1]; String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime + " AND common_subscriber_id != '' AND radius_framed_ip != '' AND radius_packet_type = 4 AND radius_acct_status_type = 1"; @@ -305,10 +331,19 @@ public class ReadClickhouseData { } private static long[] getTimeLimit() { -// long maxTime = currentHour; -// long minTime = maxTime - 3600; - long maxTime = ApplicationConfig.READ_CLICKHOUSE_MAX_TIME; - long minTime = ApplicationConfig.READ_CLICKHOUSE_MIN_TIME; + long maxTime = 0L; + long minTime = 0L; + switch (ApplicationConfig.TIME_LIMIT_TYPE) { + case 0: + maxTime = currentHour; + minTime = maxTime - ApplicationConfig.UPDATE_INTERVAL; + break; + case 1: + maxTime = ApplicationConfig.READ_CLICKHOUSE_MAX_TIME; + minTime = ApplicationConfig.READ_CLICKHOUSE_MIN_TIME; + break; + default: + } return new long[]{maxTime, minTime}; } diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java index 93a0e4d..0b4eda5 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java @@ -1,5 +1,6 @@ package cn.ac.iie.service.read; +import cn.ac.iie.config.ApplicationConfig; import cn.ac.iie.utils.ArangoDBConnect; import com.arangodb.ArangoCursor; import com.arangodb.entity.BaseDocument; @@ -11,6 +12,8 @@ import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import static cn.ac.iie.service.read.ReadClickhouseData.RECENT_COUNT_HOUR; + /** * @author wlh * 多线程全量读取arangoDb历史数据,封装到map @@ -20,12 +23,17 @@ public class ReadHistoryArangoData extends Thread { private ArangoDBConnect arangoConnect; private String query; - private ConcurrentHashMap map; + private ConcurrentHashMap> map; private Class type; private String table; private CountDownLatch countDownLatch; - public ReadHistoryArangoData(ArangoDBConnect arangoConnect, String query, ConcurrentHashMap map, Class type, String table,CountDownLatch countDownLatch) { + public ReadHistoryArangoData(ArangoDBConnect arangoConnect, + String query, + ConcurrentHashMap> map, + Class type, + String table, + CountDownLatch countDownLatch) { this.arangoConnect = arangoConnect; this.query = query; this.map = map; @@ -44,16 +52,19 @@ public class ReadHistoryArangoData extends Thread { int i = 0; for (T doc : baseDocuments) { String key = doc.getKey(); - map.put(key, doc); + int hashCode = Math.abs(key.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER; + ConcurrentHashMap tmpMap = map.get(hashCode); + tmpMap.put(key, doc); i++; } long l = System.currentTimeMillis(); - LOG.info(query + "\n读取数据" + i + "条,运行时间:" + (l - s)); + LOG.info(query + "\n读取" + i + "条数据,运行时间:" + (l - s)); } }catch (Exception e){ e.printStackTrace(); }finally { countDownLatch.countDown(); + LOG.info("本线程读取完毕,剩余线程数量:"+countDownLatch.getCount()); } } @@ -63,7 +74,7 @@ public class ReadHistoryArangoData extends Thread { String protocolRecent = protocol + "_CNT_RECENT"; ArrayList cntRecent = (ArrayList) doc.getAttribute(protocolRecent); Long[] cntRecentsSrc = cntRecent.toArray(new Long[cntRecent.size()]); - Long[] cntRecentsDst = new Long[24]; + Long[] cntRecentsDst = new Long[RECENT_COUNT_HOUR]; System.arraycopy(cntRecentsSrc, 0, cntRecentsDst, 1, cntRecentsSrc.length - 1); cntRecentsDst[0] = 0L; doc.addAttribute(protocolRecent, cntRecentsDst); diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Document.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Document.java index 834b1ff..8d69b46 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Document.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Document.java @@ -8,7 +8,6 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.HashMap; -import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -20,30 +19,28 @@ public class Document extends Thread{ private String collectionName; private ConcurrentHashMap historyDocumentMap; private CountDownLatch countDownLatch; - private Class type; Document(HashMap> newDocumentMap, ArangoDBConnect arangoManger, String collectionName, ConcurrentHashMap historyDocumentMap, - CountDownLatch countDownLatch, - Class type) { + CountDownLatch countDownLatch) { this.newDocumentMap = newDocumentMap; this.arangoManger = arangoManger; this.collectionName = collectionName; this.historyDocumentMap = historyDocumentMap; this.countDownLatch = countDownLatch; - this.type = type; } @Override public void run() { - LOG.info(collectionName+" new Map 大小:"+newDocumentMap.size()); - Set keySet = newDocumentMap.keySet(); - ArrayList resultDocumentList = new ArrayList<>(); - int i = 0; + long start = System.currentTimeMillis(); + LOG.info("新读取数据"+newDocumentMap.size()+"条,历史数据"+historyDocumentMap.size()+"条"); try { + Set keySet = newDocumentMap.keySet(); + ArrayList resultDocumentList = new ArrayList<>(); + int i = 0; for (String key : keySet) { ArrayList newDocumentSchemaList = newDocumentMap.getOrDefault(key, null); if (newDocumentSchemaList != null) { @@ -67,6 +64,8 @@ public class Document extends Thread{ LOG.error(e.toString()); }finally { countDownLatch.countDown(); + long last = System.currentTimeMillis(); + LOG.info("本线程更新完毕,用时:"+(last-start)+",剩余线程数量:"+countDownLatch.getCount()); } } @@ -84,13 +83,12 @@ public class Document extends Thread{ historyDocument.addAttribute("LAST_FOUND_TIME",lastFoundTime); } - private T mergeDocument(ArrayList newDocumentSchemaList) throws IllegalAccessException, InstantiationException { + private T mergeDocument(ArrayList newDocumentSchemaList){ if (newDocumentSchemaList == null || newDocumentSchemaList.isEmpty()){ return null; }else if (newDocumentSchemaList.size() == 1){ return newDocumentSchemaList.get(0); }else { -// T newDocument = type.newInstance(); T newDocument = null; for (T lastDoc:newDocumentSchemaList){ if (newDocument == null){ diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Relationship.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Relationship.java index 447f7fa..ab2c849 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Relationship.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Relationship.java @@ -16,7 +16,7 @@ public class Relationship extends Document { String collectionName, ConcurrentHashMap historyDocumentMap, CountDownLatch countDownLatch) { - super(newDocumentHashMap,arangoManger,collectionName,historyDocumentMap,countDownLatch,BaseEdgeDocument.class); + super(newDocumentHashMap,arangoManger,collectionName,historyDocumentMap,countDownLatch); } @Override diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Vertex.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Vertex.java index 83b7497..e9b19e4 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Vertex.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Vertex.java @@ -18,7 +18,7 @@ public class Vertex extends Document { String collectionName, ConcurrentHashMap historyDocumentMap, CountDownLatch countDownLatch) { - super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap, countDownLatch,BaseDocument.class); + super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap, countDownLatch); } @Override diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/utils/TopDomainUtils.java b/ip-learning-java-test/src/main/java/cn/ac/iie/utils/TopDomainUtils.java index b736162..cc2f2ed 100644 --- a/ip-learning-java-test/src/main/java/cn/ac/iie/utils/TopDomainUtils.java +++ b/ip-learning-java-test/src/main/java/cn/ac/iie/utils/TopDomainUtils.java @@ -56,7 +56,7 @@ public class TopDomainUtils { InputStreamReader read = new InputStreamReader( new FileInputStream(file), encoding); BufferedReader bufferedReader = new BufferedReader(read); - String lineTxt = null; + String lineTxt; while ((lineTxt = bufferedReader.readLine()) != null) { HashMap fullTop = maps.get("full"); fullTop.put(lineTxt, lineTxt); @@ -85,9 +85,9 @@ public class TopDomainUtils { try { String encoding = "UTF-8"; File file = new File(filePath); - if (file.isFile() && file.exists()) { //判断文件是否存在 + if (file.isFile() && file.exists()) { InputStreamReader read = new InputStreamReader( - new FileInputStream(file), encoding);//考虑到编码格式 + new FileInputStream(file), encoding); BufferedReader bufferedReader = new BufferedReader(read); String lineTxt = null; while ((lineTxt = bufferedReader.readLine()) != null) { diff --git a/ip-learning-java-test/src/main/resources/application.properties b/ip-learning-java-test/src/main/resources/application.properties index 92e602a..3dbc1a9 100644 --- a/ip-learning-java-test/src/main/resources/application.properties +++ b/ip-learning-java-test/src/main/resources/application.properties @@ -13,5 +13,13 @@ update.arango.batch=10000 thread.pool.number=10 thread.await.termination.time=10 + + +#读取clickhouse时间范围方式,0:读取过去一小时,1:指定时间范围 +time.limit.type=1 read.clickhouse.max.time=1571245220 -read.clickhouse.min.time=1571245210 \ No newline at end of file +read.clickhouse.min.time=1571245210 + +update.interval=3600 +distinct.client.ip.num=10000 +recent.count.hour=24 \ No newline at end of file diff --git a/ip-learning-java-test/src/main/resources/log4j.properties b/ip-learning-java-test/src/main/resources/log4j.properties index 21cea3d..ee350e5 100644 --- a/ip-learning-java-test/src/main/resources/log4j.properties +++ b/ip-learning-java-test/src/main/resources/log4j.properties @@ -4,19 +4,20 @@ log4j.logger.org.apache.http.wire=OFF #Log4j log4j.rootLogger=info,console,file -# ̨־ +# ����̨��־���� log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.Threshold=info log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n -# ļ־ +# �ļ���־���� log4j.appender.file=org.apache.log4j.DailyRollingFileAppender log4j.appender.file.Threshold=info log4j.appender.file.encoding=UTF-8 log4j.appender.file.Append=true -#··زӦĿ +#·���������·����������ز��������Ӧ��Ŀ�� #log4j.appender.file.file=/home/ceiec/iplearning/logs/ip-learning-application.log +#log4j.appender.file.file=/home/ceiec/iplearning/testLog/ip-learning-application.log log4j.appender.file.file=./logs/ip-learning-application.log log4j.appender.file.DatePattern='.'yyyy-MM-dd log4j.appender.file.layout=org.apache.log4j.PatternLayout diff --git a/ip-learning-java-test/src/test/java/cn/ac/iie/TestReadLine.java b/ip-learning-java-test/src/test/java/cn/ac/iie/TestReadLine.java new file mode 100644 index 0000000..abc7666 --- /dev/null +++ b/ip-learning-java-test/src/test/java/cn/ac/iie/TestReadLine.java @@ -0,0 +1,20 @@ +package cn.ac.iie; + +import java.io.*; + +public class TestReadLine { + public static void main(String[] args) throws Exception { + String encoding = "UTF-8"; + File file = new File("C:\\Users\\94976\\Desktop\\test.txt"); + InputStreamReader read = new InputStreamReader( + new FileInputStream(file), encoding); + BufferedReader bufferedReader = new BufferedReader(read); + String lineTxt = null; + long sum = 0L; + while ((lineTxt = bufferedReader.readLine()) != null){ + long num = Long.parseLong(lineTxt); + sum = sum + num; + } + System.out.println(sum); + } +} diff --git a/ip-learning-java-test/src/test/java/cn/ac/iie/TestReflect.java b/ip-learning-java-test/src/test/java/cn/ac/iie/TestReflect.java new file mode 100644 index 0000000..6a6b1b3 --- /dev/null +++ b/ip-learning-java-test/src/test/java/cn/ac/iie/TestReflect.java @@ -0,0 +1,20 @@ +package cn.ac.iie; + +import cn.ac.iie.service.update.vertex.Fqdn; +import cn.ac.iie.utils.ArangoDBConnect; + +import java.lang.reflect.Constructor; +import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; + +public class TestReflect { + public static void main(String[] args) throws Exception { + Class fqdnClass = Fqdn.class; +// for(Constructor constructor : fqdnClass.getConstructors()){ +// System.out.println(constructor); +// } + Constructor constructor = fqdnClass.getConstructor(HashMap.class, ArangoDBConnect.class, String.class, ConcurrentHashMap.class, CountDownLatch.class); + System.out.println(constructor); + } +}