根据server与client IP类型区分链路信息
This commit is contained in:
@@ -66,9 +66,11 @@ public class ReadClickhouseData {
|
|||||||
long bytesSum = resultSet.getLong("BYTES_SUM");
|
long bytesSum = resultSet.getLong("BYTES_SUM");
|
||||||
String ipType = resultSet.getString("ip_type");
|
String ipType = resultSet.getString("ip_type");
|
||||||
String[] commonLinkInfos = (String[]) resultSet.getArray("common_link_info").getArray();
|
String[] commonLinkInfos = (String[]) resultSet.getArray("common_link_info").getArray();
|
||||||
String commonLinkInfo = "";
|
String commonLinkInfo;
|
||||||
if (commonLinkInfos.length > 1){
|
if (commonLinkInfos.length > 1 && !commonLinkInfos[1].equals("")){
|
||||||
commonLinkInfo = commonLinkInfos[1];
|
commonLinkInfo = commonLinkInfos[1];
|
||||||
|
}else {
|
||||||
|
commonLinkInfo = commonLinkInfos[0];
|
||||||
}
|
}
|
||||||
newDoc.setKey(ip);
|
newDoc.setKey(ip);
|
||||||
newDoc.addAttribute("IP", ip);
|
newDoc.addAttribute("IP", ip);
|
||||||
@@ -264,8 +266,8 @@ public class ReadClickhouseData {
|
|||||||
|
|
||||||
public static String getVertexIpSql() {
|
public static String getVertexIpSql() {
|
||||||
String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime;
|
String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime;
|
||||||
String clientIpSql = "SELECT common_client_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_c2s_byte_num+common_s2c_byte_num) as BYTES_SUM,groupUniqArray(2)(common_link_info) as common_link_info,'client' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP";
|
String clientIpSql = "SELECT common_client_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_c2s_byte_num+common_s2c_byte_num) as BYTES_SUM,groupUniqArray(2)(common_link_info_c2s) as common_link_info,'client' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP";
|
||||||
String serverIpSql = "SELECT common_server_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_c2s_byte_num+common_s2c_byte_num) as BYTES_SUM,groupUniqArray(2)(common_link_info) as common_link_info,'server' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP";
|
String serverIpSql = "SELECT common_server_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_c2s_byte_num+common_s2c_byte_num) as BYTES_SUM,groupUniqArray(2)(common_link_info_s2c) as common_link_info,'server' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP";
|
||||||
return "SELECT * FROM((" + clientIpSql + ") UNION ALL (" + serverIpSql + "))";
|
return "SELECT * FROM((" + clientIpSql + ") UNION ALL (" + serverIpSql + "))";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import org.slf4j.Logger;
|
|||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
@@ -73,7 +74,7 @@ public class ReadHistoryArangoData<T extends BaseDocument> extends Thread {
|
|||||||
LOG.info(query + "\n读取" + i + "条数据,运行时间:" + (l - s));
|
LOG.info(query + "\n读取" + i + "条数据,运行时间:" + (l - s));
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
e.printStackTrace();
|
LOG.error(Arrays.toString(e.getStackTrace()));
|
||||||
} finally {
|
} finally {
|
||||||
countDownLatch.countDown();
|
countDownLatch.countDown();
|
||||||
LOG.info("本线程读取完毕,剩余线程数量:" + countDownLatch.getCount());
|
LOG.info("本线程读取完毕,剩余线程数量:" + countDownLatch.getCount());
|
||||||
@@ -97,6 +98,9 @@ public class ReadHistoryArangoData<T extends BaseDocument> extends Thread {
|
|||||||
private void deleteDistinctClientIpByTime(T doc) {
|
private void deleteDistinctClientIpByTime(T doc) {
|
||||||
ArrayList<String> distCip = (ArrayList<String>) doc.getAttribute("DIST_CIP");
|
ArrayList<String> distCip = (ArrayList<String>) doc.getAttribute("DIST_CIP");
|
||||||
ArrayList<Long> distCipTs = (ArrayList<Long>) doc.getAttribute("DIST_CIP_TS");
|
ArrayList<Long> distCipTs = (ArrayList<Long>) doc.getAttribute("DIST_CIP_TS");
|
||||||
|
if (distCip == null || distCip.isEmpty()){
|
||||||
|
return;
|
||||||
|
}
|
||||||
distCipTs.add(currentHour - RECENT_COUNT_HOUR * 3600);
|
distCipTs.add(currentHour - RECENT_COUNT_HOUR * 3600);
|
||||||
Collections.sort(distCipTs);
|
Collections.sort(distCipTs);
|
||||||
Collections.reverse(distCipTs);
|
Collections.reverse(distCipTs);
|
||||||
|
|||||||
@@ -4,7 +4,8 @@ arangoDB.host=192.168.40.182
|
|||||||
arangoDB.port=8529
|
arangoDB.port=8529
|
||||||
arangoDB.user=upsert
|
arangoDB.user=upsert
|
||||||
arangoDB.password=ceiec2018
|
arangoDB.password=ceiec2018
|
||||||
arangoDB.DB.name=ip-learning-test
|
#arangoDB.DB.name=ip-learning-test
|
||||||
|
arangoDB.DB.name=ip-learning-test-0
|
||||||
#arangoDB.DB.name=tsg_galaxy_v3
|
#arangoDB.DB.name=tsg_galaxy_v3
|
||||||
arangoDB.batch=100000
|
arangoDB.batch=100000
|
||||||
arangoDB.ttl=3600
|
arangoDB.ttl=3600
|
||||||
|
|||||||
@@ -1,9 +1,11 @@
|
|||||||
drivers=ru.yandex.clickhouse.ClickHouseDriver
|
drivers=ru.yandex.clickhouse.ClickHouseDriver
|
||||||
mdb.user=default
|
mdb.user=default
|
||||||
db.id=192.168.40.186:8123/tsg_galaxy_v3?socket_timeout=300000
|
#db.id=192.168.40.186:8123/tsg_galaxy_v3?socket_timeout=300000
|
||||||
mdb.password=111111
|
#mdb.password=111111
|
||||||
|
|
||||||
|
db.id=192.168.44.10:8124/tsg_galaxy_v3?socket_timeout=300000&compress=0
|
||||||
#db.id=192.168.40.224:8123/tsg_galaxy_v3?socket_timeout=300000
|
#db.id=192.168.40.224:8123/tsg_galaxy_v3?socket_timeout=300000
|
||||||
#mdb.password=ceiec2019
|
mdb.password=ceiec2019
|
||||||
initialsize=1
|
initialsize=1
|
||||||
minidle=1
|
minidle=1
|
||||||
maxactive=50
|
maxactive=50
|
||||||
|
|||||||
@@ -6,5 +6,6 @@ import com.arangodb.entity.BaseEdgeDocument;
|
|||||||
public class readHistoryDataTest {
|
public class readHistoryDataTest {
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
BaseArangoData baseArangoData = new BaseArangoData();
|
BaseArangoData baseArangoData = new BaseArangoData();
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user