修复并行数据读取不生效

This commit is contained in:
yinjiangyi
2021-08-04 18:51:42 +08:00
parent d60d5f5e43
commit 1fa03fdb8f
5 changed files with 43 additions and 39 deletions

View File

@@ -31,12 +31,12 @@ public class ApplicationConfig {
public static final String DRUID_ATTACKTYPE_COLUMN_NAME = ConfigUtils.getStringProperty("druid.attacktype.columnname");
public static final String DRUID_RECVTIME_COLUMN_NAME = ConfigUtils.getStringProperty("druid.recvtime.columnname");
public static final float BASELINE_PERIOD_CORR_THRE = ConfigUtils.getFloatProperty("baseline.period.correlative.threshold");
public static final float BASELINE_HISTORICAL_RATIO = ConfigUtils.getFloatProperty("baseline.historical.ratio.threshold");
public static final float BASELINE_SPARSE_FILL_PERCENTILE = ConfigUtils.getFloatProperty("baseline.historical.sparse.fill.percentile");
public static final Float BASELINE_PERIOD_CORR_THRE = ConfigUtils.getFloatProperty("baseline.period.correlative.threshold");
public static final Float BASELINE_HISTORICAL_RATIO = ConfigUtils.getFloatProperty("baseline.historical.ratio.threshold");
public static final Float BASELINE_SPARSE_FILL_PERCENTILE = ConfigUtils.getFloatProperty("baseline.historical.sparse.fill.percentile");
public static final String BASELINE_FUNCTION = ConfigUtils.getStringProperty("baseline.function");
public static final Integer BASELINE_RANGE_DAYS = ConfigUtils.getIntProperty("baseline.range.days");
public static final float BASELINE_RATIONAL_PERCENTILE = ConfigUtils.getFloatProperty("baseline.rational.percentile");
public static final Float BASELINE_RATIONAL_PERCENTILE = ConfigUtils.getFloatProperty("baseline.rational.percentile");
public static final String HBASE_TABLE = ConfigUtils.getStringProperty("hbase.table");
@@ -47,15 +47,13 @@ public class ApplicationConfig {
public static final Double BASELINE_KALMAN_P = ConfigUtils.getDoubleProperty("baseline.kalman.p");
public static final Double BASELINE_KALMAN_R = ConfigUtils.getDoubleProperty("baseline.kalman.r");
public static final Integer LOG_WRITE_COUNT = ConfigUtils.getIntProperty("log.write.count");
public static final Integer GENERATE_BATCH_SIZE = ConfigUtils.getIntProperty("generate.batch.size");
public static final Integer BASELINE_GENERATE_BATCH_SIZE = ConfigUtils.getIntProperty("baseline.generate.batch.size");
public static final Long DRUID_READ_BATCH_TIME_GRAD_HOUR = ConfigUtils.getLongProperty("druid.read.batch.time.grad.hour");
public static final Integer THREAD_MAX_NUM = ConfigUtils.getIntProperty("thread.max.num");
// http config
public static final Integer HTTP_REQUEST_TIMEOUT = ConfigUtils.getIntProperty("http.request.timeout");
public static final Integer HTTP_RESPONSE_TIMEOUT = ConfigUtils.getIntProperty("http.response.timeout");
public static final Integer HTTP_CONNECTION_TIMEOUT = ConfigUtils.getIntProperty("http.connection.timeout");

View File

@@ -1,5 +1,7 @@
package cn.mesalab.dao;
import cn.mesalab.utils.DruidUtils;
import org.apache.calcite.avatica.AvaticaConnection;
import org.apache.calcite.avatica.AvaticaStatement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -7,14 +9,13 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
/**
* @author yjy
* @version 1.0
* @date 2021/8/3 8:10 下午
*/
public class ReadHistoricalDruidData implements Callable {
public class ReadHistoricalDruidData implements Callable<ArrayList<Map<String, Object>>> {
private static final Logger LOG = LoggerFactory.getLogger(ReadHistoricalDruidData.class);
private String sql;
@@ -33,8 +34,9 @@ public class ReadHistoricalDruidData implements Callable {
ArrayList<Map<String, Object>> resultData = new ArrayList<>();
try {
long start = System.currentTimeMillis();
resultData.addAll(DruidData.readFromDruid(sql, statement));
AvaticaConnection connection = DruidUtils.getConn();
AvaticaStatement stat = connection.createStatement();
resultData.addAll(DruidData.readFromDruid(sql, stat));
long end = System.currentTimeMillis();
LOG.info(sql + "\n读取" + resultData.size() + "条数据,运行时间:" + (end - start));
} catch (Exception e) {

View File

@@ -27,7 +27,7 @@ import java.util.concurrent.*;
public class BaselineGeneration {
private static final Logger LOG = LoggerFactory.getLogger(BaselineGeneration.class);
private static AvaticaConnection druidConn = DruidUtils.getConn();
private static final AvaticaConnection druidConn = DruidUtils.getConn();
private static AvaticaStatement druidStatement;
static {
@@ -38,9 +38,9 @@ public class BaselineGeneration {
}
}
private static Table hbaseTable = HbaseUtils.getInstance().getHbaseTable();
private static final Table hbaseTable = HbaseUtils.getInstance().getHbaseTable();
private static List<String> attackTypeList = Arrays.asList(
private static final List<String> ATTACK_TYPE_LIST = Arrays.asList(
ApplicationConfig.DRUID_ATTACKTYPE_TCP_SYN_FLOOD,
ApplicationConfig.DRUID_ATTACKTYPE_ICMP_FLOOD,
ApplicationConfig.DRUID_ATTACKTYPE_UDP_FLOOD,
@@ -49,13 +49,13 @@ public class BaselineGeneration {
private static final Integer BASELINE_POINT_NUM =
ApplicationConfig.BASELINE_RANGE_DAYS * 24 * (60/ApplicationConfig.HISTORICAL_GRAD);
private static Tuple2<Long, Long> startEndTimes = DruidData.getTimeLimit();
private static String timeFilter = ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME
+ " >= MILLIS_TO_TIMESTAMP(" + startEndTimes._2
private static final Tuple2<Long, Long> START_END_TIMES = DruidData.getTimeLimit();
private static final String TIME_FILTER = ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME
+ " >= MILLIS_TO_TIMESTAMP(" + START_END_TIMES._2
+ ") AND " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME
+ " < MILLIS_TO_TIMESTAMP(" + startEndTimes._1 + ")";
+ " < MILLIS_TO_TIMESTAMP(" + START_END_TIMES._1 + ")";
private static ArrayList<Map<String, Object>> allFromDruid = new ArrayList<>();
private static final ArrayList<Map<String, Object>> allFromDruid = new ArrayList<>();
/**
* 程序执行
@@ -90,27 +90,35 @@ public class BaselineGeneration {
// 数据读取
LOG.info("Druid 开始读取数据");
long start = System.currentTimeMillis();
// allFromDruid = DruidData.readAllFromDruid(druidConn, druidStatement, timeFilter);
ThreadFactory loadDataThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("baseline-load-data-%d").build();
ThreadPoolExecutor loadDataExecutor = new ThreadPoolExecutor(
threadNum, threadNum, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), loadDataThreadFactory,
new ThreadPoolExecutor.AbortPolicy());
long timeGrad = (startEndTimes._1 - startEndTimes._2)/threadNum;
for (int i = 0; i < threadNum; i++) {
String sql = DruidData.getDruidQuerySql(startEndTimes._1, i, timeGrad);
long timeGrad = 3600000 * ApplicationConfig.DRUID_READ_BATCH_TIME_GRAD_HOUR;
ArrayList<Future<ArrayList<Map<String, Object>>>> resultList = new ArrayList<>();
for (int i = 0; i < (START_END_TIMES._1-START_END_TIMES._2)/timeGrad; i++) {
String sql = DruidData.getDruidQuerySql(START_END_TIMES._1, i, timeGrad);
ReadHistoricalDruidData readHistoricalDruidData = new ReadHistoricalDruidData(
sql,
druidStatement
);
Future<List<Map<String, Object>>> future = loadDataExecutor.submit(readHistoricalDruidData);
Future<ArrayList<Map<String, Object>>> future = loadDataExecutor.submit(readHistoricalDruidData);
resultList.add(future);
}
for(Future<ArrayList<Map<String, Object>>> future: resultList){
try {
allFromDruid.addAll(future.get());
if(future.get()!=null){
allFromDruid.addAll(future.get());
}else{
LOG.error("future.get()未获取到结果");
}
} catch (ExecutionException e) {
e.printStackTrace();
}
}
long last = System.currentTimeMillis();
LOG.info("Druid 加载数据共耗时:"+(last-start));
loadDataExecutor.shutdown();
@@ -128,10 +136,10 @@ public class BaselineGeneration {
new ThreadPoolExecutor.AbortPolicy());
LOG.info("共查询到服务端ip " +destinationIps.size() + "");
LOG.info("Baseline batch 大小: " + ApplicationConfig.GENERATE_BATCH_SIZE);
LOG.info("Baseline batch 大小: " + ApplicationConfig.BASELINE_GENERATE_BATCH_SIZE);
// 分批进行IP baseline生成和处理
List<List<String>> batchIpLists = Lists.partition(destinationIps, ApplicationConfig.GENERATE_BATCH_SIZE);
List<List<String>> batchIpLists = Lists.partition(destinationIps, ApplicationConfig.BASELINE_GENERATE_BATCH_SIZE);
for (List<String> batchIps: batchIpLists){
if(batchIps.size()>0){
@@ -140,9 +148,9 @@ public class BaselineGeneration {
druidConn,
druidStatement,
hbaseTable,
attackTypeList,
ATTACK_TYPE_LIST,
BASELINE_POINT_NUM,
timeFilter,
TIME_FILTER,
allFromDruid
);
generationExecutor.execute(baselineSingleThread);

View File

@@ -74,7 +74,7 @@ public class BaselineSingleThread extends Thread {
}
try {
hbaseTable.put(putList);
LOG.info("Baseline 线程 " + Thread.currentThread().getId() + " 成功写入Baseline条数共计 " + putList.size());
LOG.info(" 成功写入Baseline条数共计 " + putList.size());
} catch (IOException e) {
e.printStackTrace();
}

View File

@@ -22,15 +22,12 @@ hbase.zookeeper.client.port=2181
#读取druid时间范围方式0读取默认范围read.druid.time.range天数1指定时间范围
read.druid.time.limit.type=1
#07-01
#07-05
read.druid.min.time=1625414400000
#06-01
#read.druid.min.time=1622476800000
read.druid.max.time=1625673600000
thread.max.num=5
#读取过去N天数据最小值为3天需要判断周期性
read.historical.days=3
#历史数据汇聚粒度为10分钟
@@ -52,11 +49,10 @@ baseline.rational.percentile=0.95
baseline.kalman.p=0.000001
baseline.kalman.r=4
# 每更新1000个记录打印log
log.write.count=10000
# FOR TEST
generate.batch.size=100
baseline.generate.batch.size=1000
druid.read.batch.time.grad.hour=4
thread.max.num=20
# http client配置