Druid等待重连

This commit is contained in:
yinjiangyi
2021-08-09 16:54:30 +08:00
parent 357fc2eabc
commit 8899dac890
4 changed files with 107 additions and 16 deletions

View File

@@ -59,6 +59,8 @@ public class ApplicationConfig {
public static final Integer THREAD_POOL_NUM = ConfigUtils.getIntProperty("thread.pool.num"); public static final Integer THREAD_POOL_NUM = ConfigUtils.getIntProperty("thread.pool.num");
public static final Integer PARTITION_NUM_MAX = ConfigUtils.getIntProperty("druid.partition.num.max"); public static final Integer PARTITION_NUM_MAX = ConfigUtils.getIntProperty("druid.partition.num.max");
public static final Integer DRUID_CONNECTION_RETRY_TIME_MAX = ConfigUtils.getIntProperty("druid.connection.retry.time.max");
public static final Integer DRUID_CONNECTION_RETRY_SLEEP_TIME = ConfigUtils.getIntProperty("druid.connection.retry.sleep.time");
} }

View File

@@ -5,6 +5,7 @@ import cn.mesalab.dao.DruidData;
import cn.mesalab.service.algorithm.KalmanFilter; import cn.mesalab.service.algorithm.KalmanFilter;
import cn.mesalab.utils.DruidUtils; import cn.mesalab.utils.DruidUtils;
import cn.mesalab.utils.HbaseUtils; import cn.mesalab.utils.HbaseUtils;
import cn.mesalab.utils.RetryUtils;
import cn.mesalab.utils.SeriesUtils; import cn.mesalab.utils.SeriesUtils;
import org.apache.calcite.avatica.AvaticaClientRuntimeException; import org.apache.calcite.avatica.AvaticaClientRuntimeException;
import org.apache.calcite.avatica.AvaticaConnection; import org.apache.calcite.avatica.AvaticaConnection;
@@ -16,6 +17,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.sql.SQLException;
import java.util.*; import java.util.*;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@@ -53,12 +55,26 @@ public class BaselineSingleThread extends Thread {
this.countDownLatch = countDownLatch; this.countDownLatch = countDownLatch;
} }
@SuppressWarnings("unchecked")
@Override @Override
public void run(){ public void run(){
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
// 数据读取 // 数据读取
LOG.info("开始数据读取"); LOG.info("开始数据读取");
Map<String, List<Map<String, Object>>> batchDruidData = getBatchDruidData(); Map<String, List<Map<String, Object>>> batchDruidData = new HashMap<>();
// Druid retry
try {
batchDruidData = (Map<String, List<Map<String, Object>>>) new RetryUtils() {
@Override
protected Object toTry(){
return getBatchDruidData();
}
}.setRetryTime(ApplicationConfig.DRUID_CONNECTION_RETRY_TIME_MAX)
.setSleepTime(ApplicationConfig.DRUID_CONNECTION_RETRY_SLEEP_TIME).execute();
} catch (InterruptedException e) {
e.printStackTrace();
}
LOG.info("完成数据读取获取Server IP" + batchDruidData.size() + LOG.info("完成数据读取获取Server IP" + batchDruidData.size() +
" 运行时间:" + (System.currentTimeMillis()- start)); " 运行时间:" + (System.currentTimeMillis()- start));
@@ -71,7 +87,7 @@ public class BaselineSingleThread extends Thread {
.filter(i -> i.get(ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME).equals(attackType)).collect(Collectors.toList()); .filter(i -> i.get(ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME).equals(attackType)).collect(Collectors.toList());
// baseline生成 // baseline生成
int[] ipBaseline = generateSingleIpBaseline(ip, ipDruidData); int[] ipBaseline = generateSingleIpBaseline(ip, ipDruidData);
if (ipBaseline!= null){ if ((ipBaseline!= null ) && (ip.length()>0)){
hbaseUtils.cachedInPut(putList, ip, ipBaseline, attackType, ApplicationConfig.BASELINE_METRIC_TYPE); hbaseUtils.cachedInPut(putList, ip, ipBaseline, attackType, ApplicationConfig.BASELINE_METRIC_TYPE);
} }
} }
@@ -96,7 +112,7 @@ public class BaselineSingleThread extends Thread {
readFromDruid = DruidData.readFromDruid(sql, stat); readFromDruid = DruidData.readFromDruid(sql, stat);
connection.close(); connection.close();
stat.close(); stat.close();
} catch (Exception e){ } catch (SQLException e){
e.printStackTrace(); e.printStackTrace();
} }
return readFromDruid; return readFromDruid;

View File

@@ -0,0 +1,72 @@
package cn.mesalab.utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
/**
* @author yjy
* @version 1.0
* @date 2021/8/9 3:59 下午
*/
public abstract class RetryUtils {
private static final Logger LOG = LoggerFactory.getLogger(RetryUtils.class);
private static final int DEFAULT_RETRY_TIME = 1;
private int retryTime = DEFAULT_RETRY_TIME;
// 默认睡眠时间1s
private int sleepTime = 1000;
public int getSleepTime() {
return sleepTime;
}
public RetryUtils setSleepTime(int sleepTime) {
if(sleepTime < 0) {
throw new IllegalArgumentException("sleepTime should equal or bigger than 0");
}
this.sleepTime = sleepTime;
return this;
}
public int getRetryTime() {
return retryTime;
}
public RetryUtils setRetryTime(int retryTime) {
if (retryTime <= 0) {
throw new IllegalArgumentException("retryTime should bigger than 0");
}
this.retryTime = retryTime;
return this;
}
/**
* 重试的业务执行代码
* 失败时请抛出一个异常
* 预留方法:需要重试的业务代码,然后执行
* @return
*/
protected abstract Object toTry() throws Exception;
public Object execute() throws InterruptedException {
for (int i = 0; i < retryTime; i++) {
try {
return toTry();
} catch (Exception e) {
// LOG.error("Catched Too-Many-Connections Error, 等待重连");
Thread.sleep(sleepTime);
}
}
return null;
}
public Object submit(ExecutorService executorService) {
if (executorService == null) {
throw new IllegalArgumentException("please choose executorService!");
}
return executorService.submit((Callable) this::execute);
}
}

View File

@@ -4,7 +4,7 @@
#Druid配置 #Druid配置
druid.url=jdbc:avatica:remote:url=http://192.168.44.12:8082/druid/v2/sql/avatica/ druid.url=jdbc:avatica:remote:url=http://192.168.44.12:8082/druid/v2/sql/avatica/
druid.driver=org.apache.calcite.avatica.remote.Driver druid.driver=org.apache.calcite.avatica.remote.Driver
druid.table=top_server_ip_test_log druid.table=traffic_top_destination_ip_metrics_log
#HBase配置 #HBase配置
hbase.table=ddos_traffic_baselines hbase.table=ddos_traffic_baselines
@@ -24,16 +24,16 @@ read.druid.min.time=1625414400000
read.druid.max.time=1625673600000 read.druid.max.time=1625673600000
#字段映射 #字段映射
druid.attacktype.tcpsynflood=sessions druid.attacktype.tcpsynflood=TCP SYN Flood
druid.attacktype.udpflood=bytes druid.attacktype.udpflood=UDP Flood
druid.attacktype.icmpflood=packets druid.attacktype.icmpflood=ICMP Flood
druid.attacktype.dnsamplification=packets druid.attacktype.dnsamplification=DNS Amplification
druid.columnname.serverip=destination druid.columnname.serverip=destination_ip
druid.columnname.attacktype=order_by druid.columnname.attacktype=attack_type
druid.columnname.recvtime=__time druid.columnname.recvtime=__time
#FOR TEST #FOR TEST
druid.columnname.partition.num=session_num druid.columnname.partition.num=partition_num
baseline.metric.type=session_num baseline.metric.type=session_rate
#数据情况 #数据情况
#读取历史N天数据最小值为3天需要判断周期性 #读取历史N天数据最小值为3天需要判断周期性
@@ -64,9 +64,10 @@ baseline.kalman.m=2
################ 并发参数 ################# ################ 并发参数 #################
########################################## ##########################################
druid.read.batch.time.grad.hour=4 druid.read.batch.time.grad.hour=4
thread.pool.num=10 thread.pool.num=50
#druid分区字段partition_num的最大值 #druid分区字段partition_num的最大值为9999
druid.partition.num.max=10 druid.partition.num.max=10000
druid.connection.retry.time.max=10
druid.connection.retry.sleep.time=1000