优化构建baseline方式
This commit is contained in:
43
src/main/java/com/zdjizhi/common/DosBaselineThreshold.java
Normal file
43
src/main/java/com/zdjizhi/common/DosBaselineThreshold.java
Normal file
@@ -0,0 +1,43 @@
|
|||||||
|
package com.zdjizhi.common;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
|
||||||
|
public class DosBaselineThreshold implements Serializable {
|
||||||
|
private ArrayList<Integer> session_rate;
|
||||||
|
private Integer session_rate_baseline_type;
|
||||||
|
private Integer session_rate_default_value;
|
||||||
|
|
||||||
|
public ArrayList<Integer> getSession_rate() {
|
||||||
|
return session_rate;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSession_rate(ArrayList<Integer> session_rate) {
|
||||||
|
this.session_rate = session_rate;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Integer getSession_rate_baseline_type() {
|
||||||
|
return session_rate_baseline_type;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSession_rate_baseline_type(Integer session_rate_baseline_type) {
|
||||||
|
this.session_rate_baseline_type = session_rate_baseline_type;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Integer getSession_rate_default_value() {
|
||||||
|
return session_rate_default_value;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSession_rate_default_value(Integer session_rate_default_value) {
|
||||||
|
this.session_rate_default_value = session_rate_default_value;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "DosBaselineThreshold{" +
|
||||||
|
"session_rate=" + session_rate +
|
||||||
|
", session_rate_baseline_type=" + session_rate_baseline_type +
|
||||||
|
", session_rate_default_value=" + session_rate_default_value +
|
||||||
|
'}';
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,9 +1,6 @@
|
|||||||
package com.zdjizhi.etl;
|
package com.zdjizhi.etl;
|
||||||
|
|
||||||
import com.zdjizhi.common.CommonConfig;
|
import com.zdjizhi.common.*;
|
||||||
import com.zdjizhi.common.DosDetectionThreshold;
|
|
||||||
import com.zdjizhi.common.DosEventLog;
|
|
||||||
import com.zdjizhi.common.DosSketchLog;
|
|
||||||
import com.zdjizhi.utils.*;
|
import com.zdjizhi.utils.*;
|
||||||
import inet.ipaddr.IPAddress;
|
import inet.ipaddr.IPAddress;
|
||||||
import inet.ipaddr.IPAddressString;
|
import inet.ipaddr.IPAddressString;
|
||||||
@@ -11,7 +8,6 @@ import org.apache.commons.lang.StringUtils;
|
|||||||
import org.apache.commons.lang.text.StrBuilder;
|
import org.apache.commons.lang.text.StrBuilder;
|
||||||
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
||||||
import org.apache.flink.api.common.functions.RichMapFunction;
|
import org.apache.flink.api.common.functions.RichMapFunction;
|
||||||
import org.apache.flink.api.java.tuple.Tuple2;
|
|
||||||
import org.apache.flink.configuration.Configuration;
|
import org.apache.flink.configuration.Configuration;
|
||||||
import org.apache.flink.shaded.guava18.com.google.common.collect.TreeRangeMap;
|
import org.apache.flink.shaded.guava18.com.google.common.collect.TreeRangeMap;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
@@ -30,11 +26,15 @@ import java.util.concurrent.TimeUnit;
|
|||||||
public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
|
public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(DosDetection.class);
|
private static final Logger logger = LoggerFactory.getLogger(DosDetection.class);
|
||||||
private static Map<String, Map<String, Tuple2<ArrayList<Integer>, Integer>>> baselineMap = new HashMap<>();
|
private static Map<String, Map<String, DosBaselineThreshold>> baselineMap = new HashMap<>();
|
||||||
private final static int BASELINE_SIZE = 144;
|
|
||||||
private final static NumberFormat PERCENT_INSTANCE = NumberFormat.getPercentInstance();
|
private final static NumberFormat PERCENT_INSTANCE = NumberFormat.getPercentInstance();
|
||||||
private HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> thresholdRangeMap;
|
private HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> thresholdRangeMap;
|
||||||
|
|
||||||
|
private final static int BASELINE_SIZE = 144;
|
||||||
|
private final static int STATIC_CONDITION_TYPE = 1;
|
||||||
|
private final static int BASELINE_CONDITION_TYPE = 2;
|
||||||
|
private final static int SENSITIVITY_CONDITION_TYPE = 3;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void open(Configuration parameters) {
|
public void open(Configuration parameters) {
|
||||||
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(2,
|
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(2,
|
||||||
@@ -43,7 +43,7 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
|
|||||||
executorService.scheduleAtFixedRate(() -> thresholdRangeMap = ParseStaticThreshold.createStaticThreshold(), 0,
|
executorService.scheduleAtFixedRate(() -> thresholdRangeMap = ParseStaticThreshold.createStaticThreshold(), 0,
|
||||||
CommonConfig.STATIC_THRESHOLD_SCHEDULE_MINUTES, TimeUnit.MINUTES);
|
CommonConfig.STATIC_THRESHOLD_SCHEDULE_MINUTES, TimeUnit.MINUTES);
|
||||||
|
|
||||||
executorService.scheduleAtFixedRate(() -> baselineMap = HbaseUtils.readFromHbase(), 0,
|
executorService.scheduleAtFixedRate(() -> baselineMap = ParseBaselineThreshold.readFromHbase(), 0,
|
||||||
CommonConfig.BASELINE_THRESHOLD_SCHEDULE_DAYS, TimeUnit.DAYS);
|
CommonConfig.BASELINE_THRESHOLD_SCHEDULE_DAYS, TimeUnit.DAYS);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error("定时器任务执行失败", e);
|
logger.error("定时器任务执行失败", e);
|
||||||
@@ -92,8 +92,8 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
|
|||||||
String attackType = value.getAttack_type();
|
String attackType = value.getAttack_type();
|
||||||
long sketchSessions = value.getSketch_sessions();
|
long sketchSessions = value.getSketch_sessions();
|
||||||
if (sketchSessions > CommonConfig.STATIC_SENSITIVITY_THRESHOLD) {
|
if (sketchSessions > CommonConfig.STATIC_SENSITIVITY_THRESHOLD) {
|
||||||
Tuple2<ArrayList<Integer>, Integer> floodTypeTup = baselineMap.get(destinationIp).get(attackType);
|
DosBaselineThreshold dosBaselineThreshold = baselineMap.get(destinationIp).get(attackType);
|
||||||
Integer base = getBaseValue(floodTypeTup, value);
|
Integer base = getBaseValue(dosBaselineThreshold, value);
|
||||||
result = getDosEventLog(value, base, sketchSessions - base, 2, "sessions");
|
result = getDosEventLog(value, base, sketchSessions - base, 2, "sessions");
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
@@ -124,7 +124,7 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
|
|||||||
double percent = getDiffPercent(diff, base);
|
double percent = getDiffPercent(diff, base);
|
||||||
Severity severity = judgeSeverity(percent);
|
Severity severity = judgeSeverity(percent);
|
||||||
if (severity != Severity.NORMAL) {
|
if (severity != Severity.NORMAL) {
|
||||||
if (type == 2 && percent < CommonConfig.BASELINE_SENSITIVITY_THRESHOLD) {
|
if (type == BASELINE_CONDITION_TYPE && percent < CommonConfig.BASELINE_SENSITIVITY_THRESHOLD) {
|
||||||
logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过基线敏感阈值,日志详情\n{}", destinationIp, attackType, base, percent, value);
|
logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过基线敏感阈值,日志详情\n{}", destinationIp, attackType, base, percent, value);
|
||||||
} else {
|
} else {
|
||||||
result = getResult(value, base, severity, percent, type, tag);
|
result = getResult(value, base, severity, percent, type, tag);
|
||||||
@@ -156,12 +156,12 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
|
|||||||
return dosEventLog;
|
return dosEventLog;
|
||||||
}
|
}
|
||||||
|
|
||||||
private Integer getBaseValue(Tuple2<ArrayList<Integer>, Integer> floodTypeTup, DosSketchLog value) {
|
private Integer getBaseValue(DosBaselineThreshold dosBaselineThreshold, DosSketchLog value) {
|
||||||
Integer base = 0;
|
Integer base = 0;
|
||||||
try {
|
try {
|
||||||
if (floodTypeTup != null) {
|
if (dosBaselineThreshold != null) {
|
||||||
ArrayList<Integer> baselines = floodTypeTup.f0;
|
ArrayList<Integer> baselines = dosBaselineThreshold.getSession_rate();
|
||||||
Integer defaultVaule = floodTypeTup.f1;
|
Integer defaultVaule = dosBaselineThreshold.getSession_rate_default_value();
|
||||||
if (baselines != null && baselines.size() == BASELINE_SIZE) {
|
if (baselines != null && baselines.size() == BASELINE_SIZE) {
|
||||||
int timeIndex = getCurrentTimeIndex(value.getSketch_start_time());
|
int timeIndex = getCurrentTimeIndex(value.getSketch_start_time());
|
||||||
base = baselines.get(timeIndex);
|
base = baselines.get(timeIndex);
|
||||||
@@ -179,25 +179,25 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
|
|||||||
|
|
||||||
private String getConditions(String percent, long base, long sessions, int type, String tag) {
|
private String getConditions(String percent, long base, long sessions, int type, String tag) {
|
||||||
switch (type) {
|
switch (type) {
|
||||||
case 1:
|
case STATIC_CONDITION_TYPE:
|
||||||
return new StrBuilder()
|
return new StrBuilder()
|
||||||
.append(tag).append(" > ")
|
.append(tag).append(" > ")
|
||||||
.append(base).append(" ")
|
.append(base).append(" ")
|
||||||
.append(tag).append("/s")
|
.append(tag).append("/s")
|
||||||
.toString();
|
.toString();
|
||||||
case 2:
|
case BASELINE_CONDITION_TYPE:
|
||||||
return new StrBuilder()
|
return new StrBuilder()
|
||||||
.append(tag).append(" > ")
|
.append(tag).append(" > ")
|
||||||
.append(percent).append(" of baseline")
|
.append(percent).append(" of baseline")
|
||||||
.toString();
|
.toString();
|
||||||
case 3:
|
case SENSITIVITY_CONDITION_TYPE:
|
||||||
return new StrBuilder()
|
return new StrBuilder()
|
||||||
.append(sessions).append(" ")
|
.append(sessions).append(" ")
|
||||||
.append(tag).append("/s Unusually high ")
|
.append(tag).append("/s Unusually high ")
|
||||||
.append(StringUtils.capitalize(tag))
|
.append(StringUtils.capitalize(tag))
|
||||||
.toString();
|
.toString();
|
||||||
default:
|
default:
|
||||||
throw new IllegalArgumentException("Illegal Argument " + type + ", known types = [1,2,3]");
|
throw new IllegalArgumentException("Illegal Argument type:" + type + ", known types = [1,2,3]");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -267,49 +267,24 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
|
|||||||
/**
|
/**
|
||||||
* 判断严重程度枚举类型
|
* 判断严重程度枚举类型
|
||||||
*/
|
*/
|
||||||
CRITICAL("Critical", 5),
|
CRITICAL("Critical"),
|
||||||
SEVERE("Severe", 4),
|
SEVERE("Severe"),
|
||||||
MAJOR("Major", 3),
|
MAJOR("Major"),
|
||||||
WARNING("Warning", 2),
|
WARNING("Warning"),
|
||||||
MINOR("Minor", 1),
|
MINOR("Minor"),
|
||||||
NORMAL("Normal", 0);
|
NORMAL("Normal");
|
||||||
|
|
||||||
private final String severity;
|
private final String severity;
|
||||||
private final int score;
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return this.severity;
|
return this.severity;
|
||||||
}
|
}
|
||||||
|
|
||||||
Severity(String severity, int score) {
|
Severity(String severity) {
|
||||||
this.severity = severity;
|
this.severity = severity;
|
||||||
this.score = score;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Deprecated
|
|
||||||
private DosEventLog mergeFinalResult(Tuple2<Severity, DosEventLog> eventLogByBaseline, Tuple2<Severity, DosEventLog> eventLogByStaticThreshold) {
|
|
||||||
if (eventLogByBaseline.f0.score > eventLogByStaticThreshold.f0.score) {
|
|
||||||
logger.info("merge eventLogByBaseline {} \neventLogByStaticThreshold {}", eventLogByBaseline, eventLogByStaticThreshold);
|
|
||||||
return mergeCondition(eventLogByBaseline.f1, eventLogByStaticThreshold.f1);
|
|
||||||
} else {
|
|
||||||
logger.info("merge eventLogByStaticThreshold {} \neventLogByBaseline {}", eventLogByStaticThreshold, eventLogByBaseline);
|
|
||||||
return mergeCondition(eventLogByStaticThreshold.f1, eventLogByBaseline.f1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Deprecated
|
|
||||||
private DosEventLog mergeCondition(DosEventLog log1, DosEventLog log2) {
|
|
||||||
if (log1 != null && log2 != null) {
|
|
||||||
String conditions1 = log1.getConditions();
|
|
||||||
String conditions2 = log2.getConditions();
|
|
||||||
log1.setConditions(conditions1 + " and " + conditions2);
|
|
||||||
} else if (log1 == null && log2 != null) {
|
|
||||||
log1 = log2;
|
|
||||||
}
|
|
||||||
return log1;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
100
src/main/java/com/zdjizhi/etl/ParseBaselineThreshold.java
Normal file
100
src/main/java/com/zdjizhi/etl/ParseBaselineThreshold.java
Normal file
@@ -0,0 +1,100 @@
|
|||||||
|
package com.zdjizhi.etl;
|
||||||
|
|
||||||
|
import com.zdjizhi.common.CommonConfig;
|
||||||
|
import com.zdjizhi.common.DosBaselineThreshold;
|
||||||
|
import com.zdjizhi.utils.HbaseUtils;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.*;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
public class ParseBaselineThreshold {
|
||||||
|
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(ParseBaselineThreshold.class);
|
||||||
|
private static ArrayList<String> floodTypeList = new ArrayList<>();
|
||||||
|
|
||||||
|
private static Table table = null;
|
||||||
|
private static Scan scan = null;
|
||||||
|
|
||||||
|
static {
|
||||||
|
floodTypeList.add("TCP SYN Flood");
|
||||||
|
floodTypeList.add("UDP Flood");
|
||||||
|
floodTypeList.add("ICMP Flood");
|
||||||
|
floodTypeList.add("DNS Flood");
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void prepareHbaseEnv() throws IOException {
|
||||||
|
org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create();
|
||||||
|
|
||||||
|
config.set("hbase.zookeeper.quorum", CommonConfig.HBASE_ZOOKEEPER_QUORUM);
|
||||||
|
config.set("hbase.client.retries.number", "3");
|
||||||
|
config.set("hbase.bulkload.retries.number", "3");
|
||||||
|
config.set("zookeeper.recovery.retry", "3");
|
||||||
|
config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, CommonConfig.HBASE_CLIENT_OPERATION_TIMEOUT);
|
||||||
|
config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CommonConfig.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
|
||||||
|
|
||||||
|
TableName tableName = TableName.valueOf(CommonConfig.HBASE_BASELINE_TABLE_NAME);
|
||||||
|
Connection conn = ConnectionFactory.createConnection(config);
|
||||||
|
table = conn.getTable(tableName);
|
||||||
|
scan = new Scan().setAllowPartialResults(true).setLimit(CommonConfig.HBASE_BASELINE_TOTAL_NUM);
|
||||||
|
logger.info("连接hbase成功,正在读取baseline数据");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static Map<String, Map<String, DosBaselineThreshold>> readFromHbase() {
|
||||||
|
Map<String, Map<String, DosBaselineThreshold>> baselineMap = new HashMap<>();
|
||||||
|
try {
|
||||||
|
prepareHbaseEnv();
|
||||||
|
logger.info("开始读取baseline数据");
|
||||||
|
ResultScanner rs = table.getScanner(scan);
|
||||||
|
for (Result result : rs) {
|
||||||
|
Map<String, DosBaselineThreshold> floodTypeMap = new HashMap<>();
|
||||||
|
String rowkey = Bytes.toString(result.getRow());
|
||||||
|
for (String type:floodTypeList){
|
||||||
|
DosBaselineThreshold baselineThreshold = new DosBaselineThreshold();
|
||||||
|
ArrayList<Integer> sessionRate = HbaseUtils.getArraylist(result, type, "session_rate");
|
||||||
|
if (sessionRate != null && !sessionRate.isEmpty()){
|
||||||
|
Integer defaultValue = HbaseUtils.getIntegerValue(result, type, "session_rate_default_value");
|
||||||
|
Integer rateBaselineType = HbaseUtils.getIntegerValue(result, type, "session_rate_baseline_type");
|
||||||
|
baselineThreshold.setSession_rate(sessionRate);
|
||||||
|
baselineThreshold.setSession_rate_default_value(defaultValue);
|
||||||
|
baselineThreshold.setSession_rate_baseline_type(rateBaselineType);
|
||||||
|
floodTypeMap.put(type,baselineThreshold);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
baselineMap.put(rowkey, floodTypeMap);
|
||||||
|
}
|
||||||
|
logger.info("格式化baseline数据成功,读取IP共:{}", baselineMap.size());
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("读取hbase数据失败", e);
|
||||||
|
}
|
||||||
|
return baselineMap;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
Map<String, Map<String, DosBaselineThreshold>> baselineMap = readFromHbase();
|
||||||
|
Set<String> keySet = baselineMap.keySet();
|
||||||
|
for (String key : keySet) {
|
||||||
|
Map<String, DosBaselineThreshold> stringTuple2Map = baselineMap.get(key);
|
||||||
|
Set<String> strings = stringTuple2Map.keySet();
|
||||||
|
for (String s:strings){
|
||||||
|
DosBaselineThreshold dosBaselineThreshold = stringTuple2Map.get(s);
|
||||||
|
System.out.println(key+"---"+s+"---"+dosBaselineThreshold);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
System.out.println(baselineMap.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
@@ -1,17 +1,10 @@
|
|||||||
package com.zdjizhi.utils;
|
package com.zdjizhi.utils;
|
||||||
|
|
||||||
import com.zdjizhi.common.CommonConfig;
|
|
||||||
import org.apache.flink.api.java.tuple.Tuple2;
|
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
|
||||||
import org.apache.hadoop.hbase.TableName;
|
|
||||||
import org.apache.hadoop.hbase.client.*;
|
import org.apache.hadoop.hbase.client.*;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.io.ArrayWritable;
|
import org.apache.hadoop.io.ArrayWritable;
|
||||||
import org.apache.hadoop.io.IntWritable;
|
import org.apache.hadoop.io.IntWritable;
|
||||||
import org.apache.hadoop.io.Writable;
|
import org.apache.hadoop.io.Writable;
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.DataInputStream;
|
import java.io.DataInputStream;
|
||||||
@@ -22,75 +15,8 @@ import java.util.*;
|
|||||||
* @author wlh
|
* @author wlh
|
||||||
*/
|
*/
|
||||||
public class HbaseUtils {
|
public class HbaseUtils {
|
||||||
private static final Logger logger = LoggerFactory.getLogger(HbaseUtils.class);
|
|
||||||
private static Table table = null;
|
|
||||||
private static Scan scan = null;
|
|
||||||
private static ArrayList<String> floodTypeList = new ArrayList<>();
|
|
||||||
|
|
||||||
static {
|
public static Integer getIntegerValue(Result result, String family, String qualifier) {
|
||||||
floodTypeList.add("TCP SYN Flood");
|
|
||||||
floodTypeList.add("UDP Flood");
|
|
||||||
floodTypeList.add("ICMP Flood");
|
|
||||||
floodTypeList.add("DNS Flood");
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void prepareHbaseEnv() throws IOException {
|
|
||||||
org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create();
|
|
||||||
|
|
||||||
config.set("hbase.zookeeper.quorum", CommonConfig.HBASE_ZOOKEEPER_QUORUM);
|
|
||||||
config.set("hbase.client.retries.number", "3");
|
|
||||||
config.set("hbase.bulkload.retries.number", "3");
|
|
||||||
config.set("zookeeper.recovery.retry", "3");
|
|
||||||
config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, CommonConfig.HBASE_CLIENT_OPERATION_TIMEOUT);
|
|
||||||
config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CommonConfig.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
|
|
||||||
|
|
||||||
TableName tableName = TableName.valueOf(CommonConfig.HBASE_BASELINE_TABLE_NAME);
|
|
||||||
Connection conn = ConnectionFactory.createConnection(config);
|
|
||||||
table = conn.getTable(tableName);
|
|
||||||
scan = new Scan().setAllowPartialResults(true).setLimit(CommonConfig.HBASE_BASELINE_TOTAL_NUM);
|
|
||||||
logger.info("连接hbase成功,正在读取baseline数据");
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void main(String[] args) {
|
|
||||||
Map<String, Map<String, Tuple2<ArrayList<Integer>, Integer>>> baselineMap = readFromHbase();
|
|
||||||
Set<String> keySet = baselineMap.keySet();
|
|
||||||
for (String key : keySet) {
|
|
||||||
Map<String, Tuple2<ArrayList<Integer>, Integer>> stringTuple2Map = baselineMap.get(key);
|
|
||||||
Set<String> strings = stringTuple2Map.keySet();
|
|
||||||
for (String s:strings){
|
|
||||||
Tuple2<ArrayList<Integer>, Integer> arrayListIntegerTuple2 = stringTuple2Map.get(s);
|
|
||||||
System.out.println(key+"---"+s+"---"+arrayListIntegerTuple2.f0+"---"+arrayListIntegerTuple2.f1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
System.out.println(baselineMap.size());
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Map<String, Map<String, Tuple2<ArrayList<Integer>, Integer>>> readFromHbase() {
|
|
||||||
Map<String, Map<String, Tuple2<ArrayList<Integer>, Integer>>> baselineMap = new HashMap<>();
|
|
||||||
try {
|
|
||||||
prepareHbaseEnv();
|
|
||||||
logger.info("开始读取baseline数据");
|
|
||||||
ResultScanner rs = table.getScanner(scan);
|
|
||||||
for (Result result : rs) {
|
|
||||||
Map<String, Tuple2<ArrayList<Integer>, Integer>> floodTypeMap = new HashMap<>();
|
|
||||||
String rowkey = Bytes.toString(result.getRow());
|
|
||||||
for (String type:floodTypeList){
|
|
||||||
ArrayList<Integer> sessionRate = getArraylist(result, type, "session_rate");
|
|
||||||
if (sessionRate != null && !sessionRate.isEmpty()){
|
|
||||||
Integer defaultValue = getDefaultValue(result, type, "session_rate_default_value");
|
|
||||||
floodTypeMap.put(type,Tuple2.of(sessionRate, defaultValue));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
baselineMap.put(rowkey, floodTypeMap);
|
|
||||||
}
|
|
||||||
logger.info("格式化baseline数据成功,读取IP共:{}", baselineMap.size());
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.error("读取hbase数据失败", e);
|
|
||||||
}
|
|
||||||
return baselineMap;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static Integer getDefaultValue(Result result, String family, String qualifier) {
|
|
||||||
byte[] value = result.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier));
|
byte[] value = result.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier));
|
||||||
if (value != null){
|
if (value != null){
|
||||||
return Bytes.toInt(value);
|
return Bytes.toInt(value);
|
||||||
@@ -98,7 +24,7 @@ public class HbaseUtils {
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static ArrayList<Integer> getArraylist(Result result, String family, String qualifier) throws IOException {
|
public static ArrayList<Integer> getArraylist(Result result, String family, String qualifier) throws IOException {
|
||||||
if (containsColumn(result, family, qualifier)) {
|
if (containsColumn(result, family, qualifier)) {
|
||||||
ArrayWritable w = new ArrayWritable(IntWritable.class);
|
ArrayWritable w = new ArrayWritable(IntWritable.class);
|
||||||
w.readFields(new DataInputStream(new ByteArrayInputStream(result.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier)))));
|
w.readFields(new DataInputStream(new ByteArrayInputStream(result.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier)))));
|
||||||
|
|||||||
Reference in New Issue
Block a user