统一日志术语,修改表名
This commit is contained in:
@@ -14,7 +14,7 @@ public class ApplicationConfig {
|
|||||||
public static final Integer ARANGODB_BATCH = ConfigUtils.getIntProperty( "arangoDB.batch");
|
public static final Integer ARANGODB_BATCH = ConfigUtils.getIntProperty( "arangoDB.batch");
|
||||||
|
|
||||||
public static final Integer UPDATE_ARANGO_BATCH = ConfigUtils.getIntProperty("update.arango.batch");
|
public static final Integer UPDATE_ARANGO_BATCH = ConfigUtils.getIntProperty("update.arango.batch");
|
||||||
public static final String ARANGODB_READ_LIMIT = ConfigUtils.getStringProperty("arangoDB.read.limit");
|
public static final Long ARANGODB_READ_LIMIT = ConfigUtils.getLongProperty("arangoDB.read.limit");
|
||||||
|
|
||||||
public static final Integer THREAD_POOL_NUMBER = ConfigUtils.getIntProperty( "thread.pool.number");
|
public static final Integer THREAD_POOL_NUMBER = ConfigUtils.getIntProperty( "thread.pool.number");
|
||||||
public static final Integer THREAD_AWAIT_TERMINATION_TIME = ConfigUtils.getIntProperty( "thread.await.termination.time");
|
public static final Integer THREAD_AWAIT_TERMINATION_TIME = ConfigUtils.getIntProperty( "thread.await.termination.time");
|
||||||
|
|||||||
@@ -129,6 +129,9 @@ public class BaseArangoData {
|
|||||||
private String getQuerySql(Long cnt,int threadNumber, String table){
|
private String getQuerySql(Long cnt,int threadNumber, String table){
|
||||||
long sepNum = cnt / ApplicationConfig.THREAD_POOL_NUMBER + 1;
|
long sepNum = cnt / ApplicationConfig.THREAD_POOL_NUMBER + 1;
|
||||||
long offsetNum = threadNumber * sepNum;
|
long offsetNum = threadNumber * sepNum;
|
||||||
|
if (sepNum > ApplicationConfig.ARANGODB_READ_LIMIT){
|
||||||
|
sepNum = ApplicationConfig.ARANGODB_READ_LIMIT;
|
||||||
|
}
|
||||||
return "FOR doc IN " + table + " limit "+offsetNum+","+sepNum+" RETURN doc";
|
return "FOR doc IN " + table + " limit "+offsetNum+","+sepNum+" RETURN doc";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -44,8 +44,8 @@ public class UpdateGraphData {
|
|||||||
long start = System.currentTimeMillis();
|
long start = System.currentTimeMillis();
|
||||||
try {
|
try {
|
||||||
|
|
||||||
updateDocument(newVertexFqdnMap, historyVertexFqdnMap, "FQDN", Fqdn.class,BaseDocument.class,
|
// updateDocument(newVertexFqdnMap, historyVertexFqdnMap, "FQDN", Fqdn.class,BaseDocument.class,
|
||||||
ReadClickhouseData::getVertexFqdnSql,ReadClickhouseData::getVertexFqdnDocument);
|
// ReadClickhouseData::getVertexFqdnSql,ReadClickhouseData::getVertexFqdnDocument);
|
||||||
|
|
||||||
updateDocument(newVertexIpMap,historyVertexIpMap,"IP", Ip.class,BaseDocument.class,
|
updateDocument(newVertexIpMap,historyVertexIpMap,"IP", Ip.class,BaseDocument.class,
|
||||||
ReadClickhouseData::getVertexIpSql,ReadClickhouseData::getVertexIpDocument);
|
ReadClickhouseData::getVertexIpSql,ReadClickhouseData::getVertexIpDocument);
|
||||||
@@ -79,8 +79,8 @@ public class UpdateGraphData {
|
|||||||
long start = System.currentTimeMillis();
|
long start = System.currentTimeMillis();
|
||||||
try {
|
try {
|
||||||
|
|
||||||
updateDocument("FQDN", Fqdn.class,BaseDocument.class,
|
// updateDocument("FQDN", Fqdn.class,BaseDocument.class,
|
||||||
ReadClickhouseData::getVertexFqdnSql,ReadClickhouseData::getVertexFqdnDocument);
|
// ReadClickhouseData::getVertexFqdnSql,ReadClickhouseData::getVertexFqdnDocument);
|
||||||
|
|
||||||
updateDocument("IP", Ip.class,BaseDocument.class,
|
updateDocument("IP", Ip.class,BaseDocument.class,
|
||||||
ReadClickhouseData::getVertexIpSql,ReadClickhouseData::getVertexIpDocument);
|
ReadClickhouseData::getVertexIpSql,ReadClickhouseData::getVertexIpDocument);
|
||||||
|
|||||||
@@ -257,40 +257,40 @@ public class ReadClickhouseData {
|
|||||||
|
|
||||||
public static String getVertexFqdnSql() {
|
public static String getVertexFqdnSql() {
|
||||||
String where = "common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime;
|
String where = "common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime;
|
||||||
String sslSql = "SELECT ssl_sni AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'SSL' GROUP BY ssl_sni";
|
String sslSql = "SELECT ssl_sni AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME FROM tsg_galaxy_v3.session_record WHERE " + where + " and common_schema_type = 'SSL' GROUP BY ssl_sni";
|
||||||
String httpSql = "SELECT http_host AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host";
|
String httpSql = "SELECT http_host AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME FROM tsg_galaxy_v3.session_record WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host";
|
||||||
return "SELECT FQDN,MAX( LAST_FOUND_TIME ) AS LAST_FOUND_TIME,MIN( FIRST_FOUND_TIME ) AS FIRST_FOUND_TIME FROM ((" + sslSql + ") UNION ALL (" + httpSql + ")) GROUP BY FQDN HAVING FQDN != ''";
|
return "SELECT FQDN,MAX( LAST_FOUND_TIME ) AS LAST_FOUND_TIME,MIN( FIRST_FOUND_TIME ) AS FIRST_FOUND_TIME FROM ((" + sslSql + ") UNION ALL (" + httpSql + ")) GROUP BY FQDN HAVING FQDN != ''";
|
||||||
}
|
}
|
||||||
|
|
||||||
public static String getVertexIpSql() {
|
public static String getVertexIpSql() {
|
||||||
String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime;
|
String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime;
|
||||||
String clientIpSql = "SELECT common_client_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_c2s_byte_num+common_s2c_byte_num) as BYTES_SUM,groupUniqArray(2)(common_link_info) as common_link_info,'client' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP";
|
String clientIpSql = "SELECT common_client_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_c2s_byte_num+common_s2c_byte_num) as BYTES_SUM,groupUniqArray(2)(common_link_info_c2s) as common_link_info,'client' as ip_type FROM tsg_galaxy_v3.session_record where " + where + " group by IP";
|
||||||
String serverIpSql = "SELECT common_server_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_c2s_byte_num+common_s2c_byte_num) as BYTES_SUM,groupUniqArray(2)(common_link_info) as common_link_info,'server' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP";
|
String serverIpSql = "SELECT common_server_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_c2s_byte_num+common_s2c_byte_num) as BYTES_SUM,groupUniqArray(2)(common_link_info_s2c) as common_link_info,'server' as ip_type FROM tsg_galaxy_v3.session_record where " + where + " group by IP";
|
||||||
return "SELECT * FROM((" + clientIpSql + ") UNION ALL (" + serverIpSql + "))";
|
return "SELECT * FROM((" + clientIpSql + ") UNION ALL (" + serverIpSql + "))";
|
||||||
}
|
}
|
||||||
|
|
||||||
public static String getRelationshipFqdnAddressIpSql() {
|
public static String getRelationshipFqdnAddressIpSql() {
|
||||||
String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime;
|
String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime;
|
||||||
String sslSql = "SELECT ssl_sni AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,groupUniqArray("+DISTINCT_CLIENT_IP_NUM+")(common_client_ip) AS DIST_CIP_RECENT,'TLS' AS schema_type FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'SSL' GROUP BY ssl_sni,common_server_ip";
|
String sslSql = "SELECT ssl_sni AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,groupUniqArray("+DISTINCT_CLIENT_IP_NUM+")(common_client_ip) AS DIST_CIP_RECENT,'TLS' AS schema_type FROM tsg_galaxy_v3.session_record WHERE " + where + " and common_schema_type = 'SSL' GROUP BY ssl_sni,common_server_ip";
|
||||||
String httpSql = "SELECT http_host AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,groupUniqArray("+DISTINCT_CLIENT_IP_NUM+")(common_client_ip) AS DIST_CIP_RECENT,'HTTP' AS schema_type FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host,common_server_ip";
|
String httpSql = "SELECT http_host AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,groupUniqArray("+DISTINCT_CLIENT_IP_NUM+")(common_client_ip) AS DIST_CIP_RECENT,'HTTP' AS schema_type FROM tsg_galaxy_v3.session_record WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host,common_server_ip";
|
||||||
return "SELECT * FROM ((" + sslSql + ") UNION ALL (" + httpSql + "))WHERE FQDN != ''";
|
return "SELECT * FROM ((" + sslSql + ") UNION ALL (" + httpSql + "))WHERE FQDN != ''";
|
||||||
}
|
}
|
||||||
|
|
||||||
public static String getRelationshipIpVisitFqdnSql() {
|
public static String getRelationshipIpVisitFqdnSql() {
|
||||||
String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime;
|
String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime;
|
||||||
String httpSql = "SELECT http_host AS FQDN,common_client_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,'HTTP' AS schema_type FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host,common_client_ip";
|
String httpSql = "SELECT http_host AS FQDN,common_client_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,'HTTP' AS schema_type FROM tsg_galaxy_v3.session_record WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host,common_client_ip";
|
||||||
String sslSql = "SELECT ssl_sni AS FQDN,common_client_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,'TLS' AS schema_type FROM tsg_galaxy_v3.connection_record_log WHERE common_schema_type = 'SSL' GROUP BY ssl_sni,common_client_ip";
|
String sslSql = "SELECT ssl_sni AS FQDN,common_client_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,'TLS' AS schema_type FROM tsg_galaxy_v3.session_record WHERE common_schema_type = 'SSL' GROUP BY ssl_sni,common_client_ip";
|
||||||
return "SELECT * FROM ((" + sslSql + ") UNION ALL (" + httpSql + "))WHERE FQDN != ''";
|
return "SELECT * FROM ((" + sslSql + ") UNION ALL (" + httpSql + "))WHERE FQDN != ''";
|
||||||
}
|
}
|
||||||
|
|
||||||
public static String getVertexSubscriberSql() {
|
public static String getVertexSubscriberSql() {
|
||||||
String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime + " AND common_subscriber_id != '' AND radius_packet_type = 4 AND radius_acct_status_type = 1";
|
String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime + " AND common_subscriber_id != '' AND radius_packet_type = 4 AND radius_acct_status_type = 1";
|
||||||
return "SELECT common_subscriber_id,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME FROM radius_record_log WHERE" + where + " GROUP BY common_subscriber_id";
|
return "SELECT common_subscriber_id,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME FROM radius_record WHERE" + where + " GROUP BY common_subscriber_id";
|
||||||
}
|
}
|
||||||
|
|
||||||
public static String getRelationshipSubsciberLocateIpSql() {
|
public static String getRelationshipSubsciberLocateIpSql() {
|
||||||
String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime + " AND common_subscriber_id != '' AND radius_framed_ip != '' AND radius_packet_type = 4 AND radius_acct_status_type = 1";
|
String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime + " AND common_subscriber_id != '' AND radius_framed_ip != '' AND radius_packet_type = 4 AND radius_acct_status_type = 1";
|
||||||
return "SELECT common_subscriber_id,radius_framed_ip,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME,COUNT(*) as COUNT_TOTAL FROM radius_record_log WHERE" + where + " GROUP BY common_subscriber_id,radius_framed_ip";
|
return "SELECT common_subscriber_id,radius_framed_ip,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME,COUNT(*) as COUNT_TOTAL FROM radius_record WHERE" + where + " GROUP BY common_subscriber_id,radius_framed_ip";
|
||||||
}
|
}
|
||||||
|
|
||||||
private static long[] getTimeLimit() {
|
private static long[] getTimeLimit() {
|
||||||
|
|||||||
@@ -33,10 +33,10 @@ public class Ip extends Vertex {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void updateIpByType(BaseDocument newDocument, BaseDocument historyDocument) {
|
private void updateIpByType(BaseDocument newDocument, BaseDocument historyDocument) {
|
||||||
putSumAttribute(newDocument, historyDocument, "CLIENT_SESSION_COUNT");
|
// putSumAttribute(newDocument, historyDocument, "CLIENT_SESSION_COUNT");
|
||||||
putSumAttribute(newDocument, historyDocument, "CLIENT_BYTES_SUM");
|
// putSumAttribute(newDocument, historyDocument, "CLIENT_BYTES_SUM");
|
||||||
putSumAttribute(newDocument, historyDocument, "SERVER_SESSION_COUNT");
|
// putSumAttribute(newDocument, historyDocument, "SERVER_SESSION_COUNT");
|
||||||
putSumAttribute(newDocument, historyDocument, "SERVER_BYTES_SUM");
|
// putSumAttribute(newDocument, historyDocument, "SERVER_BYTES_SUM");
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,7 +6,6 @@ public class IpLearningApplicationTest {
|
|||||||
|
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
UpdateGraphData updateGraphData = new UpdateGraphData();
|
UpdateGraphData updateGraphData = new UpdateGraphData();
|
||||||
// updateGraphData.updateArango();
|
|
||||||
updateGraphData.updateArango2();
|
updateGraphData.updateArango2();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,15 +1,13 @@
|
|||||||
#arangoDB参数配置
|
#arangoDB参数配置
|
||||||
arangoDB.host=192.168.40.182
|
arangoDB.host=192.168.44.12
|
||||||
#arangoDB.host=192.168.40.224
|
|
||||||
arangoDB.port=8529
|
arangoDB.port=8529
|
||||||
arangoDB.user=upsert
|
arangoDB.user=root
|
||||||
arangoDB.password=ceiec2018
|
arangoDB.password=ceiec2019
|
||||||
arangoDB.DB.name=ip-learning-test
|
arangoDB.DB.name=tsg_galaxy_v3
|
||||||
#arangoDB.DB.name=tsg_galaxy_v3
|
|
||||||
arangoDB.batch=100000
|
arangoDB.batch=100000
|
||||||
arangoDB.ttl=3600
|
arangoDB.ttl=3600
|
||||||
|
|
||||||
arangoDB.read.limit=
|
arangoDB.read.limit=10000000
|
||||||
update.arango.batch=10000
|
update.arango.batch=10000
|
||||||
|
|
||||||
thread.pool.number=10
|
thread.pool.number=10
|
||||||
|
|||||||
@@ -1,9 +1,7 @@
|
|||||||
drivers=ru.yandex.clickhouse.ClickHouseDriver
|
drivers=ru.yandex.clickhouse.ClickHouseDriver
|
||||||
mdb.user=default
|
mdb.user=default
|
||||||
db.id=192.168.40.186:8123/tsg_galaxy_v3?socket_timeout=300000
|
db.id=192.168.44.67:8123/tsg_galaxy_v3?socket_timeout=3600000
|
||||||
mdb.password=111111
|
mdb.password=ceiec2019
|
||||||
#db.id=192.168.40.224:8123/tsg_galaxy_v3?socket_timeout=300000
|
|
||||||
#mdb.password=ceiec2019
|
|
||||||
initialsize=1
|
initialsize=1
|
||||||
minidle=1
|
minidle=1
|
||||||
maxactive=50
|
maxactive=50
|
||||||
|
|||||||
@@ -1,53 +0,0 @@
|
|||||||
package cn.ac.iie;
|
|
||||||
|
|
||||||
import cn.ac.iie.dao.BaseArangoData;
|
|
||||||
import cn.ac.iie.utils.ArangoDBConnect;
|
|
||||||
import cn.ac.iie.utils.ExecutorThreadPool;
|
|
||||||
import com.arangodb.entity.BaseDocument;
|
|
||||||
import com.arangodb.entity.BaseEdgeDocument;
|
|
||||||
import org.junit.After;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
import java.util.Enumeration;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
|
|
||||||
public class TestReadArango {
|
|
||||||
private static ExecutorThreadPool pool = ExecutorThreadPool.getInstance();
|
|
||||||
private static ArangoDBConnect arangoManger = ArangoDBConnect.getInstance();
|
|
||||||
|
|
||||||
private static BaseArangoData baseArangoData = new BaseArangoData();
|
|
||||||
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testReadFqdnFromArango() {
|
|
||||||
ConcurrentHashMap<Integer, ConcurrentHashMap<String, BaseDocument>> historyData =
|
|
||||||
baseArangoData.readHistoryData("FQDN", BaseDocument.class);
|
|
||||||
printMap(historyData);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testReadFqdnLocIpFromArango() {
|
|
||||||
ConcurrentHashMap<Integer, ConcurrentHashMap<String, BaseEdgeDocument>> ip =
|
|
||||||
baseArangoData.readHistoryData("R_LOCATE_FQDN2IP", BaseEdgeDocument.class);
|
|
||||||
printMap(ip);
|
|
||||||
}
|
|
||||||
|
|
||||||
private <T extends BaseDocument> void printMap(ConcurrentHashMap<Integer, ConcurrentHashMap<String, T>> historyData) {
|
|
||||||
ConcurrentHashMap<String, T> map = historyData.get(2);
|
|
||||||
Enumeration<String> keys = map.keys();
|
|
||||||
while (keys.hasMoreElements()) {
|
|
||||||
String key = keys.nextElement();
|
|
||||||
T document = map.get(key);
|
|
||||||
System.out.println(document.toString());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@After
|
|
||||||
public void clearSource() {
|
|
||||||
pool.shutdown();
|
|
||||||
arangoManger.clean();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user