线程内部创建连接
This commit is contained in:
@@ -19,14 +19,11 @@ public class ReadHistoricalDruidData implements Callable<ArrayList<Map<String, O
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ReadHistoricalDruidData.class);
|
||||
|
||||
private String sql;
|
||||
private AvaticaStatement statement;
|
||||
|
||||
public ReadHistoricalDruidData(
|
||||
String sql,
|
||||
AvaticaStatement statement
|
||||
String sql
|
||||
){
|
||||
this.sql = sql;
|
||||
this.statement = statement;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -40,7 +37,7 @@ public class ReadHistoricalDruidData implements Callable<ArrayList<Map<String, O
|
||||
long end = System.currentTimeMillis();
|
||||
LOG.info(sql + "\n读取" + resultData.size() + "条数据,运行时间:" + (end - start));
|
||||
connection.close();
|
||||
statement.close();
|
||||
stat.close();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
@@ -27,17 +27,6 @@ import java.util.concurrent.*;
|
||||
public class BaselineGeneration {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(BaselineGeneration.class);
|
||||
|
||||
private static final AvaticaConnection druidConn = DruidUtils.getConn();
|
||||
private static AvaticaStatement druidStatement;
|
||||
|
||||
static {
|
||||
try {
|
||||
druidStatement = DruidUtils.getStatement(druidConn);
|
||||
} catch (SQLException exception) {
|
||||
exception.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
private static final Table hbaseTable = HbaseUtils.getInstance().getHbaseTable();
|
||||
|
||||
private static final List<String> ATTACK_TYPE_LIST = Arrays.asList(
|
||||
@@ -70,7 +59,6 @@ public class BaselineGeneration {
|
||||
long last = System.currentTimeMillis();
|
||||
LOG.warn("运行时间:" + (last - start));
|
||||
|
||||
druidConn.close();
|
||||
hbaseTable.close();
|
||||
LOG.info("Druid 关闭连接");
|
||||
|
||||
@@ -101,8 +89,7 @@ public class BaselineGeneration {
|
||||
for (int i = 0; i < (START_END_TIMES._1-START_END_TIMES._2)/timeGrad; i++) {
|
||||
String sql = DruidData.getDruidQuerySql(START_END_TIMES._1, i, timeGrad);
|
||||
ReadHistoricalDruidData readHistoricalDruidData = new ReadHistoricalDruidData(
|
||||
sql,
|
||||
druidStatement
|
||||
sql
|
||||
);
|
||||
Future<ArrayList<Map<String, Object>>> future = loadDataExecutor.submit(readHistoricalDruidData);
|
||||
resultList.add(future);
|
||||
@@ -145,8 +132,6 @@ public class BaselineGeneration {
|
||||
if(batchIps.size()>0){
|
||||
BaselineSingleThread baselineSingleThread = new BaselineSingleThread(
|
||||
batchIps,
|
||||
druidConn,
|
||||
druidStatement,
|
||||
hbaseTable,
|
||||
ATTACK_TYPE_LIST,
|
||||
BASELINE_POINT_NUM,
|
||||
|
||||
@@ -40,8 +40,6 @@ public class BaselineSingleThread extends Thread {
|
||||
|
||||
public BaselineSingleThread(
|
||||
List<String> batchIpList,
|
||||
AvaticaConnection druidConn,
|
||||
AvaticaStatement druidStatement,
|
||||
Table hbaseTable,
|
||||
List<String> attackTypeList,
|
||||
Integer BASELINE_POINT_NUM,
|
||||
@@ -49,8 +47,6 @@ public class BaselineSingleThread extends Thread {
|
||||
List<Map<String, Object>> historicalData
|
||||
){
|
||||
this.ipList = batchIpList;
|
||||
this.druidConn = druidConn;
|
||||
this.druidStatement = druidStatement;
|
||||
this.hbaseTable = hbaseTable;
|
||||
this.attackTypeList = attackTypeList;
|
||||
this.BASELINE_POINT_NUM = BASELINE_POINT_NUM;
|
||||
@@ -60,7 +56,6 @@ public class BaselineSingleThread extends Thread {
|
||||
|
||||
@Override
|
||||
public void run(){
|
||||
// batchDruidData = DruidData.readFromDruid(druidConn, druidStatement, ipList, timeFilter);
|
||||
batchDruidData = DruidData.getBatchData(historicalData, ipList);
|
||||
|
||||
List<Put> putList = new ArrayList<>();
|
||||
|
||||
@@ -50,7 +50,7 @@ baseline.kalman.p=0.000001
|
||||
baseline.kalman.r=4
|
||||
|
||||
# FOR TEST
|
||||
baseline.generate.batch.size=1000
|
||||
baseline.generate.batch.size=100
|
||||
druid.read.batch.time.grad.hour=4
|
||||
thread.max.num=10
|
||||
|
||||
|
||||
Reference in New Issue
Block a user