解决迭代器超时问题

This commit is contained in:
yinjiangyi
2021-09-10 14:51:20 +08:00
parent e914511138
commit 828a2b972c
3 changed files with 60 additions and 19 deletions

View File

@@ -4,6 +4,7 @@ import cn.mesalab.config.ApplicationConfig;
import cn.mesalab.utils.DruidUtils;
import io.vavr.Tuple;
import io.vavr.Tuple2;
import org.apache.calcite.avatica.AvaticaConnection;
import org.apache.calcite.avatica.AvaticaStatement;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
@@ -25,18 +26,46 @@ public class DruidData {
private static final Logger LOG = LoggerFactory.getLogger(DruidData.class);
public static Map<String, List<Map<String, Object>>> readFromDruid(String sql, AvaticaStatement statement){
Map<String, List<Map<String, Object>>> rsList = null;
try{
ResultSet resultSet = DruidUtils.executeQuery(statement, sql);
LOG.info("完成数据读取,开始处理数据……");
rsList = selectAll(resultSet);
} catch (Exception e){
e.printStackTrace();
public static ArrayList<Map<String, Object>> loadFromIterator(ResultSet rs) {
ArrayList<Map<String, Object>> result = new ArrayList<>();
try {
ResultSetMetaData rmd = rs.getMetaData();
int columnCount = rmd.getColumnCount();
while (rs.next()) {
Map<String, Object> rowData = new HashMap<>();
for (int i = 1; i <= columnCount; ++i) {
rowData.put(rmd.getColumnName(i), rs.getObject(i));
}
result.add(rowData);
}
} catch (Exception ex) {
ex.printStackTrace();
}
return rsList;
return result;
}
public static Map<String, List<Map<String, Object>>> selectAll(List<Map<String, Object>> result) {
Map<String, List<Map<String, Object>>> allIpDataList = new HashMap<>();
ArrayList<String> ipList = new ArrayList<>();
for (Map<String, Object> rowData : result) {
String ip = (String) rowData.get(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME);
if (!ipList.contains(ip)) {
ipList.add(ip);
List<Map<String, Object>> ipData = new ArrayList<>();
allIpDataList.put(ip, ipData);
}
rowData.remove(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME);
allIpDataList.get(ip).add(rowData);
}
return allIpDataList;
}
/**
* 处理Druid读取返回数据为Map<String, List<Map<String, Object>>>形式
* 外层map key为ip内层map的key为ip的一条日志
@@ -72,6 +101,9 @@ public class DruidData {
return allIpDataList;
}
/**
* 计算查询时间范围,可指定时间范围(测试)或使用默认配置
* @return 时间范围起始点和终止点

View File

@@ -16,6 +16,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.CountDownLatch;
@@ -66,7 +67,6 @@ public class BaselineSingleThread extends Thread {
public void run(){
long start = System.currentTimeMillis();
// 数据读取
LOG.info("开始数据读取");
Map<String, List<Map<String, Object>>> batchDruidData = new HashMap<>();
// Druid retry
try {
@@ -113,10 +113,11 @@ public class BaselineSingleThread extends Thread {
}
}
try {
hbaseTable.put(putList);
LOG.info("MONITOR-IP频率分段统计" + frequencyBinCounter);
LOG.info("MONITOR-生成类别统计:" + generateTypeCounter);
LOG.info("MONITOR-无baseline生成的个数" + discardBaselineCounter + " 其中包括IP共" + discardIpList.size());
hbaseTable.put(putList);
LOG.info("MONITOR-完成hbase写入");
} catch (IOException e) {
e.printStackTrace();
} finally {
@@ -130,13 +131,21 @@ public class BaselineSingleThread extends Thread {
Map<String, List<Map<String, Object>>> readFromDruid = new HashMap<>();
try {
AvaticaConnection connection = DruidUtils.getConn();
AvaticaStatement stat = connection.createStatement();
stat.setQueryTimeout(ApplicationConfig.DRUID_STATEMENT_QUERY_TIMEOUT);
AvaticaStatement statement = connection.createStatement();
statement.setQueryTimeout(ApplicationConfig.DRUID_STATEMENT_QUERY_TIMEOUT);
String sql = DruidData.getBatchDruidQuerySql(attackTypeList, currentBatch, batchPartitionRange);
LOG.debug("Read Druid SQL: " + sql);
readFromDruid = DruidData.readFromDruid(sql, stat);
LOG.info("开始数据读取");
ResultSet resultSet = DruidUtils.executeQuery(statement, sql);
ArrayList<Map<String, Object>> maps = DruidData.loadFromIterator(resultSet);
connection.close();
stat.close();
statement.close();
LOG.info("开始数据处理");
readFromDruid = DruidData.selectAll(maps);
LOG.info("完成数据处理");
} catch (SQLException e){
e.printStackTrace();
}

View File

@@ -17,9 +17,9 @@ hbase.zookeeper.client.port=2181
#读取druid时间范围方式
# 0读取默认范围天数read.historical.days
# 1指定时间范围
read.druid.time.limit.type=0
read.druid.min.time=1630771200000
read.druid.max.time=1631030400000
read.druid.time.limit.type=1
read.druid.min.time=1630080000000
read.druid.max.time=1630425600000
#Druid字段映射
druid.attacktype.tcpsynflood=TCP SYN Flood
@@ -68,7 +68,7 @@ monitor.frequency.bin.num=100
##########################################
################ 并发参数 #################
##########################################
thread.pool.num=100
thread.pool.num=10
#druid分区字段partition_num的最大值为9999
druid.statement.query.timeout=36000
druid.partition.num.max=10000