多线程批量进行baseline生成和写入

This commit is contained in:
yinjiangyi
2021-08-01 18:24:22 +08:00
parent bff209ac5a
commit f1e243ded0
18 changed files with 1375 additions and 285 deletions

File diff suppressed because one or more lines are too long

View File

@@ -6,7 +6,6 @@ import cn.mesalab.utils.DruidUtils;
import io.vavr.Tuple; import io.vavr.Tuple;
import io.vavr.Tuple2; import io.vavr.Tuple2;
import org.apache.calcite.avatica.AvaticaConnection; import org.apache.calcite.avatica.AvaticaConnection;
import org.apache.calcite.avatica.AvaticaStatement;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -28,34 +27,39 @@ public class DruidData {
private static final Logger LOG = LoggerFactory.getLogger(DruidData.class); private static final Logger LOG = LoggerFactory.getLogger(DruidData.class);
private static DruidData druidData; private static DruidData druidData;
private static DruidUtils druidUtils; private AvaticaConnection connection;
{
try {
connection = DruidUtils.getConn();
} catch (SQLException exception) {
exception.printStackTrace();
}
}
private String timeFilter = ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME private String timeFilter = ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME
+ " >= MILLIS_TO_TIMESTAMP(" + getTimeLimit()._2 + " >= MILLIS_TO_TIMESTAMP(" + getTimeLimit()._2
+ ") AND " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME + ") AND " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME
+ " < MILLIS_TO_TIMESTAMP(" + getTimeLimit()._1 + ")"; + " < MILLIS_TO_TIMESTAMP(" + getTimeLimit()._1 + ")";
static {
druidUtils = DruidUtils.getInstance();
}
public static DruidData getInstance() { public static DruidData getInstance() {
if (druidData == null){
druidData = new DruidData(); druidData = new DruidData();
}
return druidData; return druidData;
} }
public ArrayList<String> getServerIpList(String attackType) { public ArrayList<String> getServerIpList(String attackType) {
Long startQueryIPLIstTime = System.currentTimeMillis();
ArrayList<String> serverIPs = new ArrayList<String>(); ArrayList<String> serverIPs = new ArrayList<String>();
String sql = "SELECT distinct " + ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME String sql = "SELECT distinct " + ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME
+ " FROM " + ApplicationConfig.DRUID_TABLE + " FROM " + ApplicationConfig.DRUID_TABLE
+ " WHERE " + ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME + " = '" + attackType + "'" + " WHERE " + ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME + " = '" + attackType + "'"
+ " AND " + timeFilter + " AND " + timeFilter
+ " LIMIT 20"; // FOR TEST + " LIMIT 100"; // FOR TEST
try{ try{
ResultSet resultSet = druidUtils.executeQuery(sql); ResultSet resultSet = DruidUtils.executeQuery(connection,sql);
while(resultSet.next()){ while(resultSet.next()){
String ip = resultSet.getString(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME); String ip = resultSet.getString(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME);
serverIPs.add(ip); serverIPs.add(ip);
@@ -63,6 +67,9 @@ public class DruidData {
} catch (Exception e){ } catch (Exception e){
e.printStackTrace(); e.printStackTrace();
} }
Long endQueryIPListTime = System.currentTimeMillis();
LOG.info("性能测试ip list查询耗时——"+(endQueryIPListTime-startQueryIPLIstTime));
return serverIPs; return serverIPs;
} }
@@ -84,9 +91,9 @@ public class DruidData {
+ " = '" + attackType + "'" + " = '" + attackType + "'"
+ " AND " + timeFilter; + " AND " + timeFilter;
System.out.println("getTimeSeriesData:" + sql); // System.out.println("getTimeSeriesData:" + sql);
try{ try{
ResultSet resultSet = druidUtils.executeQuery(sql); ResultSet resultSet = DruidUtils.executeQuery(connection,sql);
ResultSetToListService service = new ResultSetToListServiceImp(); ResultSetToListService service = new ResultSetToListServiceImp();
rsList = service.selectAll(resultSet); rsList = service.selectAll(resultSet);
} catch (Exception e){ } catch (Exception e){
@@ -129,6 +136,10 @@ public class DruidData {
} }
public void closeConn(){ public void closeConn(){
druidUtils.closeConnection(); try {
DruidUtils.closeConnection();
} catch (SQLException exception) {
exception.printStackTrace();
}
} }
} }

View File

@@ -42,9 +42,9 @@ public class BaselineGeneration {
try{ try{
generateBaselinesThread(ApplicationConfig.DRUID_ATTACKTYPE_TCP_SYN_FLOOD); generateBaselinesThread(ApplicationConfig.DRUID_ATTACKTYPE_TCP_SYN_FLOOD);
//generateBaselines(ApplicationConfig.DRUID_ATTACKTYPE_UDP_FLOOD); // generateBaselines(ApplicationConfig.DRUID_ATTACKTYPE_UDP_FLOOD);
//generateBaselines(ApplicationConfig.DRUID_ATTACKTYPE_ICMP_FLOOD); // generateBaselines(ApplicationConfig.DRUID_ATTACKTYPE_ICMP_FLOOD);
//generateBaselines(ApplicationConfig.DRUID_ATTACKTYPE_DNS_AMPL); // generateBaselines(ApplicationConfig.DRUID_ATTACKTYPE_DNS_AMPL);
long last = System.currentTimeMillis(); long last = System.currentTimeMillis();
LOG.warn("运行时间:" + (last - start)); LOG.warn("运行时间:" + (last - start));
@@ -102,25 +102,39 @@ public class BaselineGeneration {
} }
public static void generateBaselines(List<String> ipList, String attackType){ public static void generateBaselines(List<String> ipList, String attackType){
Long startGenerateTime = System.currentTimeMillis();
druidData = DruidData.getInstance();
List<Put> putList = new ArrayList<>(); List<Put> putList = new ArrayList<>();
for(String ip: ipList){ for(String ip: ipList){
int[] ipBaseline = generateSingleIpBaseline(ip, attackType); int[] ipBaseline = generateSingleIpBaseline(ip, attackType);
putList = hbaseUtils.cachedInPut(putList, ip, ipBaseline, attackType, ApplicationConfig.BASELINE_METRIC_TYPE); putList = hbaseUtils.cachedInPut(putList, ip, ipBaseline, attackType, ApplicationConfig.BASELINE_METRIC_TYPE);
} }
Long endGenerateTime = System.currentTimeMillis();
// LOG.info("性能测试Baseline生成耗时——"+(endGenerateTime-startGenerateTime));
try { try {
hbaseTable.put(putList); hbaseTable.put(putList);
LOG.info("HBase 写入数据条数 " + ApplicationConfig.GENERATE_BATCH_NUM); LOG.info("HBase 写入数据条数 " + ApplicationConfig.GENERATE_BATCH_NUM);
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
} }
Long endWriteTime = System.currentTimeMillis();
// LOG.info("性能测试Baseline写入耗时——"+(endWriteTime-endGenerateTime));
} }
private static int[] generateSingleIpBaseline(String ip, String attackType){ private static int[] generateSingleIpBaseline(String ip, String attackType){
// 查询 // 查询
Long startQuerySingleIPTime = System.currentTimeMillis();
List<Map<String, Object>> originSeries = druidData.getTimeSeriesData(ip, attackType); List<Map<String, Object>> originSeries = druidData.getTimeSeriesData(ip, attackType);
Long endQuerySingleIPTime = System.currentTimeMillis();
LOG.info("性能测试单个ip查询耗时——"+(endQuerySingleIPTime-startQuerySingleIPTime));
// 时间序列缺失值补0 // 时间序列缺失值补0
System.out.println("当前线程id"+Thread.currentThread().getId());
System.out.println("origin 大小"+originSeries.size());
List<Map<String, Object>> completSeries = SeriesUtils.complementSeries(originSeries); List<Map<String, Object>> completSeries = SeriesUtils.complementSeries(originSeries);
int[] baselineArr = new int[completSeries.size()]; int[] baselineArr = new int[completSeries.size()];
@@ -148,8 +162,9 @@ public class BaselineGeneration {
} }
} }
System.out.println(ip); Long endGenerateSingleIPTime = System.currentTimeMillis();
System.out.println(Arrays.toString(baselineArr)); LOG.info("性能测试单个baseline生成耗时——"+(endGenerateSingleIPTime-endQuerySingleIPTime));
return baselineArr; return baselineArr;
} }

View File

@@ -1,154 +0,0 @@
package cn.mesalab.utils;
import com.google.common.collect.Lists;
import org.springframework.core.task.AsyncTaskExecutor;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
/**
* 〈一句话功能简述〉:
* 〈并发工具〉
*
* @create 2020-05-28
* @since 1.0.0
*/
public class ConcurrentUtils {
/**
* @Description
* 并发执行工具类
*
* @param batchProcessNum 并发执行的个数
* @param executor 执行需要的线程池
* @param keys 批量执行的key
* @param kvConvert 从key生成value
* @param valueProcessor 对value的后置处理
* @param exceptionHook 通过key获取value时发生异常自定义处理
* @return void
**/
public static <K, V> void concurrentProcess(int batchProcessNum, AsyncTaskExecutor executor, K[] keys,
Function<K, V> kvConvert,
ValueProcessor<K, V> valueProcessor,
ExceptionHook<K> exceptionHook ){
AtomicInteger index = new AtomicInteger(1);
List<Future> futureList = Lists.newArrayListWithExpectedSize(batchProcessNum);
final int length = keys.length;
System.out.println("total jobs size:{}" + length);
for (int i = 0; i < batchProcessNum; i++) {
int finalI = i;
futureList.add(executor.submit(() -> {
System.out.println("batch process start thread num:{}"+ finalI);
int currentIndex;
while (length >= (currentIndex = index.getAndIncrement())) {
System.out.println("current job index:"+currentIndex+" of "+ length);
K key = keys[currentIndex-1];
try {
valueProcessor.process(key, kvConvert.apply(key));
} catch (Exception e) {
exceptionHook.process(key, e);
}
}
System.out.println("batch process end thread num:{}"+finalI);
}));
}
waitFutureFinished(futureList);
}
public static <K, V> void concurrentProcess(int batchProcessNum, AsyncTaskExecutor executor, K[] keys,
Function<K, V> kvConvert,
ValueProcessor<K, V> valueProcessor) {
concurrentProcess(batchProcessNum, executor, keys, kvConvert, valueProcessor,
(key,e) -> System.out.println("通过key:"+ key +" 获取value异常, e:"+e));
}
/**
* @Description
* 并发执行工具类
*
* @param batchProcessNum 并发执行的个数
* @param executor 执行需要的线程池
* @param keys 批量执行的key
* @param kvConvert 从key生成value
* @return Collection<V> 返回value的集合
**/
public static <K, V> Collection<V> concurrentGet(int batchProcessNum, AsyncTaskExecutor executor, K[] keys,
Function<K, V> kvConvert) {
List<V> rt = Lists.newArrayListWithCapacity(keys.length);
concurrentProcess(batchProcessNum, executor, keys, kvConvert, (k, v) -> {
if (v == null) {
System.out.println("key:{} apply value is null");
return;
}
rt.add(v);
});
return rt;
}
public static void waitFutureFinished(List<Future> unfinishedFuture, boolean ignoreExcetion) {
boolean interrupt = false;
while (!unfinishedFuture.isEmpty()) {
Iterator<Future> iterator = unfinishedFuture.iterator();
while (iterator.hasNext()) {
Future next = iterator.next();
if (next.isDone()) {
try {
next.get();
} catch (InterruptedException | ExecutionException e) {
System.out.println("执行异常, e:"+e);
if (!ignoreExcetion) {
interrupt = true;
break;
}
}
iterator.remove();
}
}
if (interrupt) {
break;
}
sleep();
}
if (interrupt) {
Iterator<Future> iterator = unfinishedFuture.iterator();
while (iterator.hasNext()) {
Future next = iterator.next();
if (next.isDone()) {
next.cancel(true);
}
}
throw new RuntimeException("任务异常终止");
}
}
public static void waitFutureFinished(List<Future> unfinishedFuture) {
waitFutureFinished(unfinishedFuture, false);
}
public static void sleep() {
sleep(5000);
}
public static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
System.out.println("sleep error, e:" + e);
}
}
@FunctionalInterface
public interface ValueProcessor<K, V> {
void process(K key, V value);
}
@FunctionalInterface
public interface ExceptionHook<K>{
void process(K key, Exception e);
}
}

View File

@@ -1,12 +1,9 @@
package cn.mesalab.utils; package cn.mesalab.utils;
import cn.mesalab.config.ApplicationConfig; import cn.mesalab.config.ApplicationConfig;
import cn.mesalab.service.BaselineGeneration;
import org.apache.calcite.avatica.AvaticaConnection; import org.apache.calcite.avatica.AvaticaConnection;
import org.apache.calcite.avatica.AvaticaStatement; import org.apache.calcite.avatica.AvaticaStatement;
import org.jfree.util.Log; import org.apache.hadoop.hbase.client.Table;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.DriverManager; import java.sql.DriverManager;
import java.sql.ResultSet; import java.sql.ResultSet;
@@ -19,59 +16,39 @@ import java.util.Properties;
* @date 2021/7/23 4:50 下午 * @date 2021/7/23 4:50 下午
*/ */
public class DruidUtils { public class DruidUtils {
private static final Logger LOG = LoggerFactory.getLogger(DruidUtils.class); private static ThreadLocal<AvaticaConnection> threadLocal = new ThreadLocal<AvaticaConnection>();
private static DruidUtils druidUtils;
private static final String DRUID_URL = ApplicationConfig.DRUID_URL; private static final String DRUID_URL = ApplicationConfig.DRUID_URL;
private static AvaticaConnection connection;
private static AvaticaStatement statement;
public DruidUtils() throws SQLException { /**
Properties properties = new Properties(); * 打开连接
connection = (AvaticaConnection) DriverManager.getConnection(DRUID_URL, properties); * @throws SQLException
statement = connection.createStatement(); */
} public static AvaticaConnection getConn() throws SQLException {
public static DruidUtils getInstance() {
if (connection==null){
try{
druidUtils = new DruidUtils();
} catch (SQLException e){
LOG.error("Druid 建立连接失败!");
e.printStackTrace();
}
}
return druidUtils;
}
private static AvaticaConnection getConn() throws SQLException {
Properties properties = new Properties(); Properties properties = new Properties();
AvaticaConnection connection = (AvaticaConnection) DriverManager.getConnection(DRUID_URL, properties); AvaticaConnection connection = (AvaticaConnection) DriverManager.getConnection(DRUID_URL, properties);
threadLocal.set(connection);
return connection; return connection;
} }
public void closeConnection() { /**
if(connection != null){ * 关闭连接
try{ */
connection.close(); public static void closeConnection() throws SQLException{
} catch (SQLException e){ AvaticaConnection conn = threadLocal.get();
LOG.error("Druid 关闭连接失败!"); if(conn != null){
e.printStackTrace(); conn.close();
} threadLocal.remove();
} }
} }
public ResultSet executeQuery (String sql) throws SQLException{ /**
System.out.println("executeQuery:"+sql); * 根据sql查询结果
*/
public static ResultSet executeQuery (AvaticaConnection connection, String sql) throws SQLException{
AvaticaStatement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(sql); ResultSet resultSet = statement.executeQuery(sql);
return resultSet; return resultSet;
} }
public AvaticaConnection getConnection() {
return connection;
}
public AvaticaStatement getStatement() {
return statement;
}
} }

View File

@@ -143,8 +143,6 @@ public class SeriesUtils {
if(denominator>0) { if(denominator>0) {
result = numerator / denominator; result = numerator / denominator;
} }
//FOR TEST
System.out.println(result);
return result; return result;
} }

View File

@@ -54,7 +54,5 @@ baseline.kalman.r=0.002
# 每更新1000个记录打印log # 每更新1000个记录打印log
log.write.count=10000 log.write.count=10000
# FOR TEST # FOR TEST
generate.batch.number=10 generate.batch.number=100

View File

@@ -1,61 +0,0 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
/**
* @author yjy
* @version 1.0
* @date 2021/7/30 11:09 上午
*/
public class ThreadList {
public static void main(String[] args) throws InterruptedException, ExecutionException {
List<String> list = new ArrayList<>(); //造list数据
for (int i = 0; i < 5300; i++) {
list.add("" + i);
}
// 计算线程数
int threadSize = 500;//每500条数据开启一个线程
int remainder = list.size() % threadSize; //取余
int threadNum = 0; //线程数
if (remainder == 0) { //能整除500
threadNum = list.size() / threadSize;
} else { //不能整除线程数要加1
threadNum = list.size() / threadSize + 1;
}
ExecutorService eService = Executors.newFixedThreadPool(threadNum); //创建一个线程池
List<Callable<String>> cList = new ArrayList<>(); //定义添加线程的集合
Callable<String> task = null; //创建单个线程
List<String> sList = null;
for (int i = 0; i < threadNum; i++) { //每个线程中加入分段数据
if (i == threadNum - 1) {
sList = list.subList(i * threadSize, list.size());
} else {
sList = list.subList(i * threadSize, (i + 1) * threadSize);
}
final List<String> nowList = sList;
//创建单个线程
task = new Callable<String>() {
@Override
public String call() throws Exception {
StringBuffer sb = new StringBuffer();
for (int j = 0; j < nowList.size(); j++) {
sb.append("" + nowList.get(j));
}
return sb.toString();
}
};
cList.add(task); //添加线程
}
List<Future<String>> results = eService.invokeAll(cList); //执行所有创建的线程,并获取返回值(会把所有线程的返回值都返回)
for (Future<String> str : results) { //打印返回值
System.out.println(str.get());
}
eService.shutdown();
}
}

View File

@@ -54,7 +54,7 @@ baseline.kalman.r=0.002
# 每更新1000个记录打印log # 每更新1000个记录打印log
log.write.count=10000 log.write.count=10000
# FOR TEST # FOR TEST
generate.batch.number=10 generate.batch.number=100

Binary file not shown.