增加IP vertex COMMON_LINK_INFO属性

This commit is contained in:
wanglihui
2020-08-05 14:09:39 +08:00
parent 1cd1fc66eb
commit 66e1972a97
7 changed files with 27 additions and 21 deletions

View File

@@ -14,6 +14,7 @@ public class ApplicationConfig {
public static final Integer ARANGODB_BATCH = ConfigUtils.getIntProperty( "arangoDB.batch");
public static final Integer UPDATE_ARANGO_BATCH = ConfigUtils.getIntProperty("update.arango.batch");
public static final String ARANGODB_READ_LIMIT = ConfigUtils.getStringProperty("arangoDB.read.limit");
public static final Integer THREAD_POOL_NUMBER = ConfigUtils.getIntProperty( "thread.pool.number");
public static final Integer THREAD_AWAIT_TERMINATION_TIME = ConfigUtils.getIntProperty( "thread.await.termination.time");

View File

@@ -87,7 +87,7 @@ public class BaseArangoData {
long diffTime = (maxTime - minTime) / ApplicationConfig.THREAD_POOL_NUMBER;
long maxThreadTime = minTime + (threadNumber + 1)* diffTime;
long minThreadTime = minTime + threadNumber * diffTime;
return "FOR doc IN "+table+" filter doc.FIRST_FOUND_TIME >= "+minThreadTime+" and doc.FIRST_FOUND_TIME <= "+maxThreadTime+" RETURN doc";
return "FOR doc IN "+table+" filter doc.FIRST_FOUND_TIME >= "+minThreadTime+" and doc.FIRST_FOUND_TIME <= "+maxThreadTime+" " + ApplicationConfig.ARANGODB_READ_LIMIT + " RETURN doc";
}
}

View File

@@ -65,6 +65,11 @@ public class ReadClickhouseData {
long sessionCount = resultSet.getLong("SESSION_COUNT");
long bytesSum = resultSet.getLong("BYTES_SUM");
String ipType = resultSet.getString("ip_type");
String[] commonLinkInfos = (String[]) resultSet.getArray("common_link_info").getArray();
String commonLinkInfo = "";
if (commonLinkInfos.length > 1){
commonLinkInfo = commonLinkInfos[1];
}
newDoc.setKey(ip);
newDoc.addAttribute("IP", ip);
newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime);
@@ -84,7 +89,7 @@ public class ReadClickhouseData {
break;
default:
}
newDoc.addAttribute("COMMON_LINK_INFO", "");
newDoc.addAttribute("COMMON_LINK_INFO", commonLinkInfo);
} catch (Exception e) {
e.printStackTrace();
}
@@ -258,8 +263,8 @@ public class ReadClickhouseData {
public static String getVertexIpSql() {
String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime;
String clientIpSql = "SELECT common_client_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_c2s_byte_num) as BYTES_SUM,'client' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP";
String serverIpSql = "SELECT common_server_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_s2c_byte_num) as BYTES_SUM,'server' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP";
String clientIpSql = "SELECT common_client_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_c2s_byte_num) as BYTES_SUM,groupUniqArray(2)(common_link_info) as common_link_info,'client' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP";
String serverIpSql = "SELECT common_server_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_s2c_byte_num) as BYTES_SUM,groupUniqArray(2)(common_link_info) as common_link_info,'server' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP";
return "SELECT * FROM((" + clientIpSql + ") UNION ALL (" + serverIpSql + "))";
}

View File

@@ -124,5 +124,10 @@ public class Document<T extends BaseDocument> extends Thread{
lastDoc.addAttribute(attribute,firstSumAttribute+lastSumAttribute);
}
protected void replaceAttribute(T firstDoc,T lastDoc,String attribute){
Object attributeObj = firstDoc.getAttribute(attribute);
lastDoc.addAttribute(attribute,attributeObj);
}
}

View File

@@ -23,19 +23,13 @@ public class Ip extends Vertex {
protected void updateFunction(BaseDocument newDocument, BaseDocument historyDocument) {
super.updateFunction(newDocument, historyDocument);
updateIpByType(newDocument, historyDocument);
super.replaceAttribute(newDocument,historyDocument,"COMMON_LINK_INFO");
}
@Override
protected void mergeFunction(BaseDocument lastDoc, BaseDocument newDocument) {
super.mergeFunction(lastDoc, newDocument);
mergeIpByType(lastDoc, newDocument);
}
private void mergeIpByType(BaseDocument lastDoc, BaseDocument newDocument) {
putSumAttribute(lastDoc,newDocument,"CLIENT_SESSION_COUNT");
putSumAttribute(lastDoc,newDocument,"CLIENT_BYTES_SUM");
putSumAttribute(lastDoc,newDocument,"SERVER_SESSION_COUNT");
putSumAttribute(lastDoc,newDocument,"SERVER_BYTES_SUM");
updateIpByType(lastDoc, newDocument);
}
private void updateIpByType(BaseDocument newDocument, BaseDocument historyDocument) {

View File

@@ -2,13 +2,14 @@
#arangoDB.host=192.168.40.182
arangoDB.host=192.168.40.224
arangoDB.port=8529
arangoDB.user=root
arangoDB.password=111111
arangoDB.user=upsert
arangoDB.password=ceiec2018
arangoDB.DB.name=ip-learning-test
#arangoDB.DB.name=ip-learning-test-0
#arangoDB.DB.name=tsg_galaxy_v3
arangoDB.batch=100000
arangoDB.ttl=3600
arangoDB.read.limit=limit 100
update.arango.batch=10000
thread.pool.number=10
@@ -16,9 +17,9 @@ thread.await.termination.time=10
#读取clickhouse时间范围方式0读取过去一小时1指定时间范围
time.limit.type=1
read.clickhouse.max.time=1596097930
read.clickhouse.min.time=1593676894
time.limit.type=0
read.clickhouse.max.time=1595833062
read.clickhouse.min.time=1595833060
update.interval=3600
distinct.client.ip.num=10000

View File

@@ -1,9 +1,9 @@
drivers=ru.yandex.clickhouse.ClickHouseDriver
db.id=192.168.40.224:8123/tsg_galaxy_v3?socket_timeout=300000
#db.id=192.168.40.186:8123/tsg_galaxy_v3?socket_timeout=300000
mdb.user=default
mdb.password=ceiec2019
#db.id=192.168.40.186:8123/tsg_galaxy_v3?socket_timeout=300000
#mdb.password=111111
db.id=192.168.40.224:8123/tsg_galaxy_v3?socket_timeout=300000
mdb.password=ceiec2019
initialsize=1
minidle=1
maxactive=50