调整日志输出。
This commit is contained in:
@@ -8,7 +8,6 @@ import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
@@ -25,8 +24,9 @@ public class ReadClickhouseData {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ReadClickhouseData.class);
|
||||
|
||||
|
||||
public static final Integer DISTINCT_CLIENT_IP_NUM = 100;
|
||||
public static final Integer RECENT_COUNT_HOUR = 24;
|
||||
private static long[] timeLimit = getTimeLimit();
|
||||
public static final Integer DISTINCT_CLIENT_IP_NUM = ApplicationConfig.DISTINCT_CLIENT_IP_NUM;
|
||||
public static final Integer RECENT_COUNT_HOUR = ApplicationConfig.RECENT_COUNT_HOUR;
|
||||
public static final HashSet<String> PROTOCOL_SET;
|
||||
|
||||
static {
|
||||
@@ -36,156 +36,183 @@ public class ReadClickhouseData {
|
||||
PROTOCOL_SET.add("DNS");
|
||||
}
|
||||
|
||||
public static BaseDocument getVertexFqdnDocument(ResultSet resultSet) throws SQLException {
|
||||
String fqdnOrReferer = resultSet.getString("FQDN");
|
||||
String fqdnName = TopDomainUtils.getDomainFromUrl(fqdnOrReferer);
|
||||
public static BaseDocument getVertexFqdnDocument(ResultSet resultSet){
|
||||
BaseDocument newDoc = null;
|
||||
if (isDomain(fqdnName)) {
|
||||
long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME");
|
||||
long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME");
|
||||
newDoc = new BaseDocument();
|
||||
newDoc.setKey(fqdnName);
|
||||
newDoc.addAttribute("FQDN_NAME", fqdnName);
|
||||
newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime);
|
||||
newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime);
|
||||
}
|
||||
return newDoc;
|
||||
}
|
||||
|
||||
public static BaseDocument getVertexIpDocument(ResultSet resultSet) throws SQLException {
|
||||
BaseDocument newDoc = new BaseDocument();
|
||||
String ip = resultSet.getString("IP");
|
||||
long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME");
|
||||
long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME");
|
||||
long sessionCount = resultSet.getLong("SESSION_COUNT");
|
||||
long bytesSum = resultSet.getLong("BYTES_SUM");
|
||||
String ipType = resultSet.getString("ip_type");
|
||||
newDoc.setKey(ip);
|
||||
newDoc.addAttribute("IP", ip);
|
||||
newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime);
|
||||
newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime);
|
||||
switch (ipType) {
|
||||
case "client":
|
||||
newDoc.addAttribute("CLIENT_SESSION_COUNT", sessionCount);
|
||||
newDoc.addAttribute("CLIENT_BYTES_SUM", bytesSum);
|
||||
newDoc.addAttribute("SERVER_SESSION_COUNT", 0L);
|
||||
newDoc.addAttribute("SERVER_BYTES_SUM", 0L);
|
||||
break;
|
||||
case "server":
|
||||
newDoc.addAttribute("SERVER_SESSION_COUNT", sessionCount);
|
||||
newDoc.addAttribute("SERVER_BYTES_SUM", bytesSum);
|
||||
newDoc.addAttribute("CLIENT_SESSION_COUNT", 0L);
|
||||
newDoc.addAttribute("CLIENT_BYTES_SUM", 0L);
|
||||
break;
|
||||
default:
|
||||
newDoc.addAttribute("SERVER_SESSION_COUNT", 0L);
|
||||
newDoc.addAttribute("SERVER_BYTES_SUM", 0L);
|
||||
newDoc.addAttribute("CLIENT_SESSION_COUNT", 0L);
|
||||
newDoc.addAttribute("CLIENT_BYTES_SUM", 0L);
|
||||
break;
|
||||
}
|
||||
// newDoc.addAttribute("COMMON_LINK_INFO", "");
|
||||
return newDoc;
|
||||
}
|
||||
|
||||
public static BaseDocument getVertexSubscriberDocument(ResultSet resultSet) throws SQLException {
|
||||
String subscriberId = resultSet.getString("common_subscriber_id");
|
||||
long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME");
|
||||
long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME");
|
||||
BaseDocument newDoc = new BaseDocument();
|
||||
newDoc.setKey(subscriberId);
|
||||
newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime);
|
||||
newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime);
|
||||
return newDoc;
|
||||
}
|
||||
|
||||
public static BaseEdgeDocument getRelationshipSubsciberLocateIpDocument(ResultSet resultSet) throws SQLException {
|
||||
String subscriberId = resultSet.getString("common_subscriber_id");
|
||||
String framedIp = resultSet.getString("radius_framed_ip");
|
||||
long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME");
|
||||
long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME");
|
||||
long countTotal = resultSet.getLong("COUNT_TOTAL");
|
||||
|
||||
String key = subscriberId + "-" + framedIp;
|
||||
BaseEdgeDocument newDoc = new BaseEdgeDocument();
|
||||
newDoc.setKey(key);
|
||||
newDoc.setFrom("SUBSCRIBER/" + subscriberId);
|
||||
newDoc.setTo("IP/" + framedIp);
|
||||
newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime);
|
||||
newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime);
|
||||
newDoc.addAttribute("COUNT_TOTAL", countTotal);
|
||||
|
||||
return newDoc;
|
||||
|
||||
}
|
||||
|
||||
public static BaseEdgeDocument getRelationFqdnAddressIpDocument(ResultSet resultSet) throws SQLException {
|
||||
String vFqdn = resultSet.getString("FQDN");
|
||||
BaseEdgeDocument newDoc = null;
|
||||
if (isDomain(vFqdn)) {
|
||||
String vIp = resultSet.getString("common_server_ip");
|
||||
long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME");
|
||||
long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME");
|
||||
long countTotal = resultSet.getLong("COUNT_TOTAL");
|
||||
String[] distCipRecents = (String[]) resultSet.getArray("DIST_CIP_RECENT").getArray();
|
||||
long[] clientIpTs = new long[distCipRecents.length];
|
||||
for (int i = 0; i < clientIpTs.length; i++) {
|
||||
clientIpTs[i] = currentHour;
|
||||
try {
|
||||
String fqdnOrReferer = resultSet.getString("FQDN");
|
||||
String fqdnName = TopDomainUtils.getDomainFromUrl(fqdnOrReferer);
|
||||
if (isDomain(fqdnName)) {
|
||||
long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME");
|
||||
long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME");
|
||||
newDoc = new BaseDocument();
|
||||
newDoc.setKey(fqdnName);
|
||||
newDoc.addAttribute("FQDN_NAME", fqdnName);
|
||||
newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime);
|
||||
newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime);
|
||||
}
|
||||
|
||||
String key = vFqdn + "-" + vIp;
|
||||
newDoc = new BaseEdgeDocument();
|
||||
newDoc.setKey(key);
|
||||
newDoc.setFrom("FQDN/" + vFqdn);
|
||||
newDoc.setTo("IP/" + vIp);
|
||||
newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime);
|
||||
newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime);
|
||||
newDoc.addAttribute("CNT_TOTAL",countTotal);
|
||||
newDoc.addAttribute("DIST_CIP", distCipRecents);
|
||||
newDoc.addAttribute("DIST_CIP_TS", clientIpTs);
|
||||
|
||||
}catch (Exception e){
|
||||
e.printStackTrace();
|
||||
}
|
||||
return newDoc;
|
||||
}
|
||||
|
||||
public static BaseEdgeDocument getRelationshipFqdnSameFqdnDocument(ResultSet resultSet) throws SQLException {
|
||||
BaseEdgeDocument newDoc = null;
|
||||
String domainFqdn = resultSet.getString("domainFqdn");
|
||||
String referer = resultSet.getString("referer");
|
||||
String refererFqdn = TopDomainUtils.getDomainFromUrl(referer);
|
||||
if (isDomain(refererFqdn) && isDomain(domainFqdn)){
|
||||
public static BaseDocument getVertexIpDocument(ResultSet resultSet){
|
||||
BaseDocument newDoc = new BaseDocument();
|
||||
try {
|
||||
String ip = resultSet.getString("IP");
|
||||
long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME");
|
||||
long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME");
|
||||
long countTotal = resultSet.getLong("COUNT_TOTAL");
|
||||
String key = domainFqdn + "-" + refererFqdn;
|
||||
newDoc = new BaseEdgeDocument();
|
||||
newDoc.setKey(key);
|
||||
newDoc.setFrom("FQDN/" + domainFqdn);
|
||||
newDoc.setTo("FQDN/" + refererFqdn);
|
||||
long sessionCount = resultSet.getLong("SESSION_COUNT");
|
||||
long bytesSum = resultSet.getLong("BYTES_SUM");
|
||||
String ipType = resultSet.getString("ip_type");
|
||||
newDoc.setKey(ip);
|
||||
newDoc.addAttribute("IP", ip);
|
||||
newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime);
|
||||
newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime);
|
||||
newDoc.addAttribute("CNT_TOTAL",countTotal);
|
||||
switch (ipType) {
|
||||
case "client":
|
||||
newDoc.addAttribute("CLIENT_SESSION_COUNT", sessionCount);
|
||||
newDoc.addAttribute("CLIENT_BYTES_SUM", bytesSum);
|
||||
newDoc.addAttribute("SERVER_SESSION_COUNT", 0L);
|
||||
newDoc.addAttribute("SERVER_BYTES_SUM", 0L);
|
||||
break;
|
||||
case "server":
|
||||
newDoc.addAttribute("SERVER_SESSION_COUNT", sessionCount);
|
||||
newDoc.addAttribute("SERVER_BYTES_SUM", bytesSum);
|
||||
newDoc.addAttribute("CLIENT_SESSION_COUNT", 0L);
|
||||
newDoc.addAttribute("CLIENT_BYTES_SUM", 0L);
|
||||
break;
|
||||
default:
|
||||
newDoc.addAttribute("SERVER_SESSION_COUNT", 0L);
|
||||
newDoc.addAttribute("SERVER_BYTES_SUM", 0L);
|
||||
newDoc.addAttribute("CLIENT_SESSION_COUNT", 0L);
|
||||
newDoc.addAttribute("CLIENT_BYTES_SUM", 0L);
|
||||
break;
|
||||
}
|
||||
// newDoc.addAttribute("COMMON_LINK_INFO", "");
|
||||
}catch (Exception e){
|
||||
e.printStackTrace();
|
||||
}
|
||||
return newDoc;
|
||||
}
|
||||
|
||||
public static BaseEdgeDocument getRelationIpVisitFqdnDocument(ResultSet resultSet) throws SQLException {
|
||||
BaseEdgeDocument newDoc = null;
|
||||
String vFqdn = resultSet.getString("FQDN");
|
||||
if (isDomain(vFqdn)) {
|
||||
String vIp = resultSet.getString("common_client_ip");
|
||||
String key = vIp + "-" + vFqdn;
|
||||
public static BaseDocument getVertexSubscriberDocument(ResultSet resultSet){
|
||||
BaseDocument newDoc = new BaseDocument();
|
||||
try {
|
||||
String subscriberId = resultSet.getString("common_subscriber_id");
|
||||
long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME");
|
||||
long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME");
|
||||
newDoc.setKey(subscriberId);
|
||||
newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime);
|
||||
newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime);
|
||||
}catch (Exception e){
|
||||
e.printStackTrace();
|
||||
}
|
||||
return newDoc;
|
||||
}
|
||||
|
||||
public static BaseEdgeDocument getRelationshipSubsciberLocateIpDocument(ResultSet resultSet){
|
||||
BaseEdgeDocument newDoc = new BaseEdgeDocument();
|
||||
try {
|
||||
String subscriberId = resultSet.getString("common_subscriber_id");
|
||||
String framedIp = resultSet.getString("radius_framed_ip");
|
||||
long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME");
|
||||
long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME");
|
||||
long countTotal = resultSet.getLong("COUNT_TOTAL");
|
||||
|
||||
newDoc = new BaseEdgeDocument();
|
||||
String key = subscriberId + "-" + framedIp;
|
||||
newDoc.setKey(key);
|
||||
newDoc.setFrom("IP/" + vIp);
|
||||
newDoc.setTo("FQDN/" + vFqdn);
|
||||
newDoc.addAttribute("CNT_TOTAL",countTotal);
|
||||
newDoc.setFrom("SUBSCRIBER/" + subscriberId);
|
||||
newDoc.setTo("IP/" + framedIp);
|
||||
newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime);
|
||||
newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime);
|
||||
newDoc.addAttribute("COUNT_TOTAL", countTotal);
|
||||
}catch (Exception e){
|
||||
e.printStackTrace();
|
||||
}
|
||||
return newDoc;
|
||||
|
||||
}
|
||||
|
||||
public static BaseEdgeDocument getRelationFqdnAddressIpDocument(ResultSet resultSet){
|
||||
BaseEdgeDocument newDoc = null;
|
||||
try {
|
||||
String vFqdn = resultSet.getString("FQDN");
|
||||
if (isDomain(vFqdn)) {
|
||||
String vIp = resultSet.getString("common_server_ip");
|
||||
long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME");
|
||||
long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME");
|
||||
long countTotal = resultSet.getLong("COUNT_TOTAL");
|
||||
String[] distCipRecents = (String[]) resultSet.getArray("DIST_CIP_RECENT").getArray();
|
||||
long[] clientIpTs = new long[distCipRecents.length];
|
||||
for (int i = 0; i < clientIpTs.length; i++) {
|
||||
clientIpTs[i] = currentHour;
|
||||
}
|
||||
|
||||
String key = vFqdn + "-" + vIp;
|
||||
newDoc = new BaseEdgeDocument();
|
||||
newDoc.setKey(key);
|
||||
newDoc.setFrom("FQDN/" + vFqdn);
|
||||
newDoc.setTo("IP/" + vIp);
|
||||
newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime);
|
||||
newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime);
|
||||
newDoc.addAttribute("CNT_TOTAL",countTotal);
|
||||
newDoc.addAttribute("DIST_CIP", distCipRecents);
|
||||
newDoc.addAttribute("DIST_CIP_TS", clientIpTs);
|
||||
|
||||
}
|
||||
}catch (Exception e){
|
||||
e.printStackTrace();
|
||||
}
|
||||
return newDoc;
|
||||
}
|
||||
|
||||
public static BaseEdgeDocument getRelationshipFqdnSameFqdnDocument(ResultSet resultSet){
|
||||
BaseEdgeDocument newDoc = null;
|
||||
try {
|
||||
String domainFqdn = resultSet.getString("domainFqdn");
|
||||
String referer = resultSet.getString("referer");
|
||||
String refererFqdn = TopDomainUtils.getDomainFromUrl(referer);
|
||||
if (isDomain(refererFqdn) && isDomain(domainFqdn)){
|
||||
long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME");
|
||||
long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME");
|
||||
long countTotal = resultSet.getLong("COUNT_TOTAL");
|
||||
String key = domainFqdn + "-" + refererFqdn;
|
||||
newDoc = new BaseEdgeDocument();
|
||||
newDoc.setKey(key);
|
||||
newDoc.setFrom("FQDN/" + domainFqdn);
|
||||
newDoc.setTo("FQDN/" + refererFqdn);
|
||||
newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime);
|
||||
newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime);
|
||||
newDoc.addAttribute("CNT_TOTAL",countTotal);
|
||||
}
|
||||
}catch (Exception e){
|
||||
e.printStackTrace();
|
||||
}
|
||||
return newDoc;
|
||||
}
|
||||
|
||||
public static BaseEdgeDocument getRelationIpVisitFqdnDocument(ResultSet resultSet){
|
||||
BaseEdgeDocument newDoc = null;
|
||||
try {
|
||||
String vFqdn = resultSet.getString("FQDN");
|
||||
if (isDomain(vFqdn)) {
|
||||
String vIp = resultSet.getString("common_client_ip");
|
||||
String key = vIp + "-" + vFqdn;
|
||||
long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME");
|
||||
long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME");
|
||||
long countTotal = resultSet.getLong("COUNT_TOTAL");
|
||||
|
||||
newDoc = new BaseEdgeDocument();
|
||||
newDoc.setKey(key);
|
||||
newDoc.setFrom("IP/" + vIp);
|
||||
newDoc.setTo("FQDN/" + vFqdn);
|
||||
newDoc.addAttribute("CNT_TOTAL",countTotal);
|
||||
newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime);
|
||||
newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime);
|
||||
}
|
||||
}catch (Exception e){
|
||||
e.printStackTrace();
|
||||
}
|
||||
return newDoc;
|
||||
}
|
||||
@@ -206,6 +233,12 @@ public class ReadClickhouseData {
|
||||
if (fqdn == null || fqdn.length() == 0){
|
||||
return false;
|
||||
}
|
||||
if (fqdn.contains(":")){
|
||||
String s = fqdn.split(":")[0];
|
||||
if (s.contains(":")){
|
||||
return false;
|
||||
}
|
||||
}
|
||||
String[] fqdnArr = fqdn.split("\\.");
|
||||
if (fqdnArr.length < 4 || fqdnArr.length > 4) {
|
||||
return true;
|
||||
@@ -245,7 +278,6 @@ public class ReadClickhouseData {
|
||||
}
|
||||
|
||||
public static String getVertexFqdnSql() {
|
||||
long[] timeLimit = getTimeLimit();
|
||||
long maxTime = timeLimit[0];
|
||||
long minTime = timeLimit[1];
|
||||
String where = "recv_time >= "+minTime+" and recv_time <= "+maxTime;
|
||||
@@ -255,7 +287,6 @@ public class ReadClickhouseData {
|
||||
}
|
||||
|
||||
public static String getVertexIpSql() {
|
||||
long[] timeLimit = getTimeLimit();
|
||||
long maxTime = timeLimit[0];
|
||||
long minTime = timeLimit[1];
|
||||
String where = " recv_time >= " + minTime + " AND recv_time < " + maxTime;
|
||||
@@ -265,7 +296,6 @@ public class ReadClickhouseData {
|
||||
}
|
||||
|
||||
public static String getRelationshipFqdnAddressIpSql() {
|
||||
long[] timeLimit = getTimeLimit();
|
||||
long maxTime = timeLimit[0];
|
||||
long minTime = timeLimit[1];
|
||||
String where = "recv_time >= "+minTime+" and recv_time <= "+maxTime +" AND s1_domain != '' AND s1_d_ip != '' ";
|
||||
@@ -273,7 +303,6 @@ public class ReadClickhouseData {
|
||||
}
|
||||
|
||||
public static String getRelationshipFqdnSameFqdnSql(){
|
||||
long[] timeLimit = getTimeLimit();
|
||||
long maxTime = timeLimit[0];
|
||||
long minTime = timeLimit[1];
|
||||
String where = "recv_time >= "+minTime+" and recv_time <= "+maxTime +" AND s1_domain != '' AND s1_referer != '' ";
|
||||
@@ -281,7 +310,6 @@ public class ReadClickhouseData {
|
||||
}
|
||||
|
||||
public static String getRelationshipIpVisitFqdnSql() {
|
||||
long[] timeLimit = getTimeLimit();
|
||||
long maxTime = timeLimit[0];
|
||||
long minTime = timeLimit[1];
|
||||
String where = "recv_time >= "+minTime+" and recv_time <= "+maxTime+" AND s1_s_ip != '' AND s1_domain != '' ";
|
||||
@@ -289,7 +317,6 @@ public class ReadClickhouseData {
|
||||
}
|
||||
|
||||
public static String getVertexSubscriberSql() {
|
||||
long[] timeLimit = getTimeLimit();
|
||||
long maxTime = timeLimit[0];
|
||||
long minTime = timeLimit[1];
|
||||
String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime + " AND common_subscriber_id != '' AND radius_packet_type = 4 AND radius_acct_status_type = 1";
|
||||
@@ -297,7 +324,6 @@ public class ReadClickhouseData {
|
||||
}
|
||||
|
||||
public static String getRelationshipSubsciberLocateIpSql() {
|
||||
long[] timeLimit = getTimeLimit();
|
||||
long maxTime = timeLimit[0];
|
||||
long minTime = timeLimit[1];
|
||||
String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime + " AND common_subscriber_id != '' AND radius_framed_ip != '' AND radius_packet_type = 4 AND radius_acct_status_type = 1";
|
||||
@@ -305,10 +331,19 @@ public class ReadClickhouseData {
|
||||
}
|
||||
|
||||
private static long[] getTimeLimit() {
|
||||
// long maxTime = currentHour;
|
||||
// long minTime = maxTime - 3600;
|
||||
long maxTime = ApplicationConfig.READ_CLICKHOUSE_MAX_TIME;
|
||||
long minTime = ApplicationConfig.READ_CLICKHOUSE_MIN_TIME;
|
||||
long maxTime = 0L;
|
||||
long minTime = 0L;
|
||||
switch (ApplicationConfig.TIME_LIMIT_TYPE) {
|
||||
case 0:
|
||||
maxTime = currentHour;
|
||||
minTime = maxTime - ApplicationConfig.UPDATE_INTERVAL;
|
||||
break;
|
||||
case 1:
|
||||
maxTime = ApplicationConfig.READ_CLICKHOUSE_MAX_TIME;
|
||||
minTime = ApplicationConfig.READ_CLICKHOUSE_MIN_TIME;
|
||||
break;
|
||||
default:
|
||||
}
|
||||
return new long[]{maxTime, minTime};
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package cn.ac.iie.service.read;
|
||||
|
||||
import cn.ac.iie.config.ApplicationConfig;
|
||||
import cn.ac.iie.utils.ArangoDBConnect;
|
||||
import com.arangodb.ArangoCursor;
|
||||
import com.arangodb.entity.BaseDocument;
|
||||
@@ -11,6 +12,8 @@ import java.util.List;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import static cn.ac.iie.service.read.ReadClickhouseData.RECENT_COUNT_HOUR;
|
||||
|
||||
/**
|
||||
* @author wlh
|
||||
* 多线程全量读取arangoDb历史数据,封装到map
|
||||
@@ -20,12 +23,17 @@ public class ReadHistoryArangoData<T extends BaseDocument> extends Thread {
|
||||
|
||||
private ArangoDBConnect arangoConnect;
|
||||
private String query;
|
||||
private ConcurrentHashMap<String, T> map;
|
||||
private ConcurrentHashMap<Integer,ConcurrentHashMap<String, T>> map;
|
||||
private Class<T> type;
|
||||
private String table;
|
||||
private CountDownLatch countDownLatch;
|
||||
|
||||
public ReadHistoryArangoData(ArangoDBConnect arangoConnect, String query, ConcurrentHashMap<String, T> map, Class<T> type, String table,CountDownLatch countDownLatch) {
|
||||
public ReadHistoryArangoData(ArangoDBConnect arangoConnect,
|
||||
String query,
|
||||
ConcurrentHashMap<Integer,ConcurrentHashMap<String, T>> map,
|
||||
Class<T> type,
|
||||
String table,
|
||||
CountDownLatch countDownLatch) {
|
||||
this.arangoConnect = arangoConnect;
|
||||
this.query = query;
|
||||
this.map = map;
|
||||
@@ -44,16 +52,19 @@ public class ReadHistoryArangoData<T extends BaseDocument> extends Thread {
|
||||
int i = 0;
|
||||
for (T doc : baseDocuments) {
|
||||
String key = doc.getKey();
|
||||
map.put(key, doc);
|
||||
int hashCode = Math.abs(key.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER;
|
||||
ConcurrentHashMap<String, T> tmpMap = map.get(hashCode);
|
||||
tmpMap.put(key, doc);
|
||||
i++;
|
||||
}
|
||||
long l = System.currentTimeMillis();
|
||||
LOG.info(query + "\n读取数据" + i + "条,运行时间:" + (l - s));
|
||||
LOG.info(query + "\n读取" + i + "条数据,运行时间:" + (l - s));
|
||||
}
|
||||
}catch (Exception e){
|
||||
e.printStackTrace();
|
||||
}finally {
|
||||
countDownLatch.countDown();
|
||||
LOG.info("本线程读取完毕,剩余线程数量:"+countDownLatch.getCount());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -63,7 +74,7 @@ public class ReadHistoryArangoData<T extends BaseDocument> extends Thread {
|
||||
String protocolRecent = protocol + "_CNT_RECENT";
|
||||
ArrayList<Long> cntRecent = (ArrayList<Long>) doc.getAttribute(protocolRecent);
|
||||
Long[] cntRecentsSrc = cntRecent.toArray(new Long[cntRecent.size()]);
|
||||
Long[] cntRecentsDst = new Long[24];
|
||||
Long[] cntRecentsDst = new Long[RECENT_COUNT_HOUR];
|
||||
System.arraycopy(cntRecentsSrc, 0, cntRecentsDst, 1, cntRecentsSrc.length - 1);
|
||||
cntRecentsDst[0] = 0L;
|
||||
doc.addAttribute(protocolRecent, cntRecentsDst);
|
||||
|
||||
@@ -8,7 +8,6 @@ import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
@@ -20,30 +19,28 @@ public class Document<T extends BaseDocument> extends Thread{
|
||||
private String collectionName;
|
||||
private ConcurrentHashMap<String, T> historyDocumentMap;
|
||||
private CountDownLatch countDownLatch;
|
||||
private Class<T> type;
|
||||
|
||||
Document(HashMap<String, ArrayList<T>> newDocumentMap,
|
||||
ArangoDBConnect arangoManger,
|
||||
String collectionName,
|
||||
ConcurrentHashMap<String, T> historyDocumentMap,
|
||||
CountDownLatch countDownLatch,
|
||||
Class<T> type) {
|
||||
CountDownLatch countDownLatch) {
|
||||
this.newDocumentMap = newDocumentMap;
|
||||
this.arangoManger = arangoManger;
|
||||
this.collectionName = collectionName;
|
||||
this.historyDocumentMap = historyDocumentMap;
|
||||
this.countDownLatch = countDownLatch;
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
LOG.info(collectionName+" new Map 大小:"+newDocumentMap.size());
|
||||
Set<String> keySet = newDocumentMap.keySet();
|
||||
ArrayList<T> resultDocumentList = new ArrayList<>();
|
||||
int i = 0;
|
||||
long start = System.currentTimeMillis();
|
||||
LOG.info("新读取数据"+newDocumentMap.size()+"条,历史数据"+historyDocumentMap.size()+"条");
|
||||
try {
|
||||
Set<String> keySet = newDocumentMap.keySet();
|
||||
ArrayList<T> resultDocumentList = new ArrayList<>();
|
||||
int i = 0;
|
||||
for (String key : keySet) {
|
||||
ArrayList<T> newDocumentSchemaList = newDocumentMap.getOrDefault(key, null);
|
||||
if (newDocumentSchemaList != null) {
|
||||
@@ -67,6 +64,8 @@ public class Document<T extends BaseDocument> extends Thread{
|
||||
LOG.error(e.toString());
|
||||
}finally {
|
||||
countDownLatch.countDown();
|
||||
long last = System.currentTimeMillis();
|
||||
LOG.info("本线程更新完毕,用时:"+(last-start)+",剩余线程数量:"+countDownLatch.getCount());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -84,13 +83,12 @@ public class Document<T extends BaseDocument> extends Thread{
|
||||
historyDocument.addAttribute("LAST_FOUND_TIME",lastFoundTime);
|
||||
}
|
||||
|
||||
private T mergeDocument(ArrayList<T> newDocumentSchemaList) throws IllegalAccessException, InstantiationException {
|
||||
private T mergeDocument(ArrayList<T> newDocumentSchemaList){
|
||||
if (newDocumentSchemaList == null || newDocumentSchemaList.isEmpty()){
|
||||
return null;
|
||||
}else if (newDocumentSchemaList.size() == 1){
|
||||
return newDocumentSchemaList.get(0);
|
||||
}else {
|
||||
// T newDocument = type.newInstance();
|
||||
T newDocument = null;
|
||||
for (T lastDoc:newDocumentSchemaList){
|
||||
if (newDocument == null){
|
||||
|
||||
@@ -16,7 +16,7 @@ public class Relationship extends Document<BaseEdgeDocument> {
|
||||
String collectionName,
|
||||
ConcurrentHashMap<String, BaseEdgeDocument> historyDocumentMap,
|
||||
CountDownLatch countDownLatch) {
|
||||
super(newDocumentHashMap,arangoManger,collectionName,historyDocumentMap,countDownLatch,BaseEdgeDocument.class);
|
||||
super(newDocumentHashMap,arangoManger,collectionName,historyDocumentMap,countDownLatch);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -18,7 +18,7 @@ public class Vertex extends Document<BaseDocument> {
|
||||
String collectionName,
|
||||
ConcurrentHashMap<String, BaseDocument> historyDocumentMap,
|
||||
CountDownLatch countDownLatch) {
|
||||
super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap, countDownLatch,BaseDocument.class);
|
||||
super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap, countDownLatch);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
Reference in New Issue
Block a user