增加基线值为0时处理逻辑,将0替换为默认值。

This commit is contained in:
wanglihui
2021-08-17 18:56:53 +08:00
parent 9bda526d48
commit c957f3ec1c
7 changed files with 452 additions and 40 deletions

View File

@@ -197,6 +197,11 @@
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.6</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>

View File

@@ -45,4 +45,17 @@ public class CommonConfig {
public static final double BASELINE_SESSIONS_SEVERE_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.severe.threshold");
public static final double BASELINE_SESSIONS_CRITICAL_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.critical.threshold");
public static final String BIFANG_SERVER_URI = CommonConfigurations.getStringProperty("bifang.server.uri");
public static final String BIFANG_SERVER_USER = CommonConfigurations.getStringProperty("bifang.server.user");
public static final String BIFANG_SERVER_PASSWORD = CommonConfigurations.getStringProperty("bifang.server.password");
public static final String BIFANG_SERVER_ENCRYPTPWD_PATH = CommonConfigurations.getStringProperty("bifang.server.encryptpwd.path");
public static final String BIFANG_SERVER_LOGIN_PATH = CommonConfigurations.getStringProperty("bifang.server.login.path");
public static final String BIFANG_SERVER_POLICY_THRESHOLD_PATH = CommonConfigurations.getStringProperty("bifang.server.policy.threshold.path");
public static final int HTTP_POOL_MAX_CONNECTION = CommonConfigurations.getIntProperty("http.pool.max.connection");
public static final int HTTP_POOL_MAX_PER_ROUTE = CommonConfigurations.getIntProperty("http.pool.max.per.route");
public static final int HTTP_POOL_REQUEST_TIMEOUT = CommonConfigurations.getIntProperty("http.pool.request.timeout");
public static final int HTTP_POOL_CONNECT_TIMEOUT = CommonConfigurations.getIntProperty("http.pool.connect.timeout");
public static final int HTTP_POOL_RESPONSE_TIMEOUT = CommonConfigurations.getIntProperty("http.pool.response.timeout");
}

View File

@@ -8,6 +8,7 @@ import com.zdjizhi.utils.IpUtils;
import com.zdjizhi.utils.SnowflakeId;
import org.apache.commons.lang.StringUtils;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -22,7 +23,7 @@ import java.util.*;
public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
private static final Logger logger = LoggerFactory.getLogger(DosDetection.class);
private static Map<String, Map<String,List<Integer>>> baselineMap;
private static Map<String, Map<String, Tuple2<ArrayList<Integer>, Integer>>> baselineMap;
private final static int BASELINE_SIZE = 144;
private final static NumberFormat PERCENT_INSTANCE = NumberFormat.getPercentInstance();
@@ -39,12 +40,12 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
String attackType = value.getAttack_type();
logger.debug("当前判断IP{}, 类型: {}",destinationIp,attackType);
if (baselineMap.containsKey(destinationIp)){
List<Integer> baseline = baselineMap.get(destinationIp).get(attackType);
if (baseline != null && baseline.size() == BASELINE_SIZE){
int timeIndex = getCurrentTimeIndex(value.getSketch_start_time());
Integer base = baseline.get(timeIndex);
Tuple2<ArrayList<Integer>, Integer> floodTypeTup = baselineMap.get(destinationIp).get(attackType);
List<Integer> baselines = floodTypeTup.f0;
if (baselines != null && baselines.size() == BASELINE_SIZE){
Integer base = getBaseValue(baselines,value,floodTypeTup.f1);
long diff = value.getSketch_sessions() - base;
if (diff > 0){
if (diff > 0 && base != 0){
String percent = getDiffPercent(diff, base);
double diffPercentDouble = getDiffPercentDouble(percent);
Severity severity = judgeSeverity(diffPercentDouble);
@@ -85,6 +86,21 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
return dosEventLog;
}
private Integer getBaseValue(List<Integer> baselines,DosSketchLog value,int defauleVaule){
try {
int timeIndex = getCurrentTimeIndex(value.getSketch_start_time());
Integer base = baselines.get(timeIndex);
if (base == 0){
logger.debug("获取到当前IP: {},类型: {} baseline值为0,替换为P95观测值{}",value.getDestination_ip(),value.getAttack_type(),defauleVaule);
base = defauleVaule;
}
return base;
}catch (Exception e){
logger.error("解析baseline数据失败,返回默认值0",e);
return 0;
}
}
private String getConditions(String percent){
return "sessions > "+percent+" of baseline";
}

View File

@@ -1,6 +1,7 @@
package com.zdjizhi.utils;
import com.zdjizhi.common.CommonConfig;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
@@ -22,11 +23,16 @@ import java.util.*;
*/
public class HbaseUtils {
private static final Logger logger = LoggerFactory.getLogger(HbaseUtils.class);
private static Table table = null;
private static Table table = null;
private static Scan scan = null;
public static Map<String, Map<String,List<Integer>>> baselineMap = new HashMap<>();
public static Map<String, Map<String, Tuple2<ArrayList<Integer>, Integer>>> baselineMap = new HashMap<>();
private static ArrayList<String> floodTypeList = new ArrayList<>();
static {
floodTypeList.add("TCP SYN Flood");
floodTypeList.add("UDP Flood");
floodTypeList.add("ICMP Flood");
floodTypeList.add("DNS Amplification");
readFromHbase();
}
@@ -40,7 +46,7 @@ public class HbaseUtils {
config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, CommonConfig.HBASE_CLIENT_OPERATION_TIMEOUT);
config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CommonConfig.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
TableName tableName = TableName.valueOf(CommonConfig.HBASE_BASELINE_TABLE_NAME);
TableName tableName = TableName.valueOf(CommonConfig.HBASE_BASELINE_TABLE_NAME);
Connection conn = ConnectionFactory.createConnection(config);
table = conn.getTable(tableName);
scan = new Scan().setAllowPartialResults(true).setLimit(CommonConfig.HBASE_BASELINE_TOTAL_NUM);
@@ -49,60 +55,61 @@ public class HbaseUtils {
public static void main(String[] args) {
Set<String> keySet = baselineMap.keySet();
for (String key:keySet){
Map<String, List<Integer>> stringListMap = baselineMap.get(key);
Set<String> typeSet = stringListMap.keySet();
for (String type:typeSet){
List<Integer> lines = stringListMap.get(type);
if (lines != null){
System.out.println(key+"--"+type+"--"+Arrays.toString(lines.toArray()));
}
}
for (String key : keySet) {
}
System.out.println(baselineMap.size());
}
private static void readFromHbase(){
private static void readFromHbase() {
try {
prepareHbaseEnv();
logger.info("开始读取baseline数据");
ResultScanner rs = table.getScanner(scan);
for (Result result : rs) {
Map<String, List<Integer>> floodTypeMap = new HashMap<>();
Map<String, Tuple2<ArrayList<Integer>, Integer>> floodTypeMap = new HashMap<>();
String rowkey = Bytes.toString(result.getRow());
ArrayList<Integer> tcp = getArraylist(result,"TCP SYN Flood", "session_rate");
ArrayList<Integer> udp = getArraylist(result,"UDP Flood", "session_rate");
ArrayList<Integer> icmp = getArraylist(result,"ICMP Flood", "session_rate");
ArrayList<Integer> dns = getArraylist(result,"DNS Amplification", "session_rate");
floodTypeMap.put("TCP SYN Flood",tcp);
floodTypeMap.put("UDP Flood",udp);
floodTypeMap.put("ICMP Flood",icmp);
floodTypeMap.put("DNS Amplification",dns);
baselineMap.put(rowkey,floodTypeMap);
for (String type:floodTypeList){
ArrayList<Integer> sessionRate = getArraylist(result, type, "session_rate");
Integer defaultValue = getDefaultValue(result, type, "session_rate_default_value");
floodTypeMap.put(type,Tuple2.of(sessionRate, defaultValue));
}
baselineMap.put(rowkey, floodTypeMap);
}
logger.info("格式化baseline数据成功读取IP共{}",baselineMap.size());
}catch (Exception e){
logger.error("读取hbase数据失败",e);
logger.info("格式化baseline数据成功读取IP共{}", baselineMap.size());
} catch (Exception e) {
logger.error("读取hbase数据失败", e);
}
}
private static ArrayList<Integer> getArraylist(Result result,String family,String qualifier) throws IOException {
if (!result.containsColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier))){
return null;
private static Integer getDefaultValue(Result result, String family, String qualifier) {
byte[] value = result.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier));
if (value != null){
return Bytes.toInt(value);
}
ArrayWritable w = new ArrayWritable(IntWritable.class);
w.readFields(new DataInputStream(new ByteArrayInputStream(result.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier)))));
return fromWritable(w);
return 1;
}
private static ArrayList<Integer> getArraylist(Result result, String family, String qualifier) throws IOException {
if (containsColumn(result, family, qualifier)) {
ArrayWritable w = new ArrayWritable(IntWritable.class);
w.readFields(new DataInputStream(new ByteArrayInputStream(result.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier)))));
return fromWritable(w);
}
return null;
}
private static ArrayList<Integer> fromWritable(ArrayWritable writable) {
Writable[] writables = writable.get();
ArrayList<Integer> list = new ArrayList<>(writables.length);
for (Writable wrt : writables) {
list.add(((IntWritable)wrt).get());
list.add(((IntWritable) wrt).get());
}
return list;
}
private static boolean containsColumn(Result result, String family, String qualifier) {
return result.containsColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
}
}

View File

@@ -0,0 +1,284 @@
package com.zdjizhi.utils;
import com.zdjizhi.common.CommonConfig;
import org.apache.http.*;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpRequestRetryHandler;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.*;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.conn.ConnectTimeoutException;
import org.apache.http.conn.ConnectionKeepAliveStrategy;
import org.apache.http.conn.HttpHostConnectException;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.message.BasicHeaderElementIterator;
import org.apache.http.protocol.HTTP;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLHandshakeException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
/**
* @author wlh
* http client工具类
*/
public class HttpClientUtils {
/** 全局连接池对象 */
private static final PoolingHttpClientConnectionManager CONN_MANAGER = new PoolingHttpClientConnectionManager();
private static Logger logger = LoggerFactory.getLogger(HttpClientUtils.class);
/*
* 静态代码块配置连接池信息
*/
static {
// 设置最大连接数
CONN_MANAGER.setMaxTotal(CommonConfig.HTTP_POOL_MAX_CONNECTION);
// 设置每个连接的路由数
CONN_MANAGER.setDefaultMaxPerRoute(CommonConfig.HTTP_POOL_MAX_PER_ROUTE);
}
/**
* 获取Http客户端连接对象
* @return Http客户端连接对象
*/
private static CloseableHttpClient getHttpClient() {
// 创建Http请求配置参数
RequestConfig requestConfig = RequestConfig.custom()
// 获取连接超时时间
.setConnectionRequestTimeout(CommonConfig.HTTP_POOL_REQUEST_TIMEOUT)
// 请求超时时间
.setConnectTimeout(CommonConfig.HTTP_POOL_CONNECT_TIMEOUT)
// 响应超时时间
.setSocketTimeout(CommonConfig.HTTP_POOL_RESPONSE_TIMEOUT)
.build();
/*
* 测出超时重试机制为了防止超时不生效而设置
* 如果直接放回false,不重试
* 这里会根据情况进行判断是否重试
*/
HttpRequestRetryHandler retry = (exception, executionCount, context) -> {
if (executionCount >= 3) {// 如果已经重试了3次就放弃
return false;
}
if (exception instanceof NoHttpResponseException) {// 如果服务器丢掉了连接,那么就重试
return true;
}
if (exception instanceof SSLHandshakeException) {// 不要重试SSL握手异常
return false;
}
if (exception instanceof UnknownHostException) {// 目标服务器不可达
return false;
}
if (exception instanceof ConnectTimeoutException) {// 连接被拒绝
return false;
}
if (exception instanceof HttpHostConnectException) {// 连接被拒绝
return false;
}
if (exception instanceof SSLException) {// ssl握手异常
return false;
}
if (exception instanceof InterruptedIOException) {// 超时
return true;
}
HttpClientContext clientContext = HttpClientContext.adapt(context);
HttpRequest request = clientContext.getRequest();
// 如果请求是幂等的,就再次尝试
return !(request instanceof HttpEntityEnclosingRequest);
};
ConnectionKeepAliveStrategy myStrategy = (response, context) -> {
HeaderElementIterator it = new BasicHeaderElementIterator
(response.headerIterator(HTTP.CONN_KEEP_ALIVE));
while (it.hasNext()) {
HeaderElement he = it.nextElement();
String param = he.getName();
String value = he.getValue();
if (value != null && "timeout".equalsIgnoreCase(param)) {
return Long.parseLong(value) * 1000;
}
}
return 60 * 1000;//如果没有约定则默认定义时长为60s
};
// 创建httpClient
return HttpClients.custom()
// 把请求相关的超时信息设置到连接客户端
.setDefaultRequestConfig(requestConfig)
// 把请求重试设置到连接客户端
.setRetryHandler(retry)
.setKeepAliveStrategy(myStrategy)
// 配置连接池管理对象
.setConnectionManager(CONN_MANAGER)
.build();
}
/**
* GET请求
*
* @param url 请求地
* @return message
*/
public static String httpGet(String url, Header... headers) {
String msg = "-1";
// 获取客户端连接对象
CloseableHttpClient httpClient = getHttpClient();
CloseableHttpResponse response = null;
try {
URL ul = new URL(url);
URI uri = new URI(ul.getProtocol(),null, ul.getHost(), ul.getPort(), ul.getPath(), ul.getQuery(), null);
logger.info("http get uri {}",uri);
// 创建GET请求对象
HttpGet httpGet = new HttpGet(uri);
if (StringUtil.isNotEmpty(headers)) {
for (Header h : headers) {
httpGet.addHeader(h);
logger.info("request header : {}",h);
}
}
// 执行请求
response = httpClient.execute(httpGet);
int statusCode = response.getStatusLine().getStatusCode();
// 获取响应实体
HttpEntity entity = response.getEntity();
// 获取响应信息
msg = EntityUtils.toString(entity, "UTF-8");
if (statusCode != HttpStatus.SC_OK) {
logger.error("Http get content is :{}" , msg);
}
} catch (URISyntaxException e) {
logger.error("URI 转换错误: {}", e.getMessage());
} catch (ClientProtocolException e) {
logger.error("协议错误: {}", e.getMessage());
} catch (ParseException e) {
logger.error("解析错误: {}", e.getMessage());
} catch (IOException e) {
logger.error("IO错误: {}",e.getMessage());
} finally {
if (null != response) {
try {
EntityUtils.consume(response.getEntity());
response.close();
} catch (IOException e) {
logger.error("释放链接错误: {}", e.getMessage());
}
}
}
return msg;
}
/**
* POST 请求
* @param url url参数
* @param requestBody 请求体
* @return post请求返回结果
*/
public static String httpPost(String url, String requestBody, Header... headers) {
String msg = "-1";
// 获取客户端连接对象
CloseableHttpClient httpClient = getHttpClient();
// 创建POST请求对象
CloseableHttpResponse response = null;
try {
URL ul = new URL(url);
URI uri = new URI(ul.getProtocol(),null, ul.getHost(), ul.getPort(), ul.getPath(), ul.getQuery(), null);
logger.info("http post uri:{} http post body:{}", uri, requestBody);
HttpPost httpPost = new HttpPost(uri);
httpPost.setHeader("Content-Type", "application/json");
if (StringUtil.isNotEmpty(headers)) {
for (Header h : headers) {
httpPost.addHeader(h);
logger.info("request header : {}",h);
}
}
if(StringUtil.isNotBlank(requestBody)) {
byte[] bytes = requestBody.getBytes(StandardCharsets.UTF_8);
httpPost.setEntity(new ByteArrayEntity(bytes));
}
response = httpClient.execute(httpPost);
int statusCode = response.getStatusLine().getStatusCode();
// 获取响应实体
HttpEntity entity = response.getEntity();
// 获取响应信息
msg = EntityUtils.toString(entity, "UTF-8");
if (statusCode != HttpStatus.SC_OK) {
logger.error("Http post content is :{}" , msg);
}
} catch (URISyntaxException e) {
logger.error("URI 转换错误: {}", e.getMessage());
} catch (ClientProtocolException e) {
logger.error("协议错误: {}", e.getMessage());
} catch (ParseException e) {
logger.error("解析错误: {}", e.getMessage());
} catch (IOException e) {
logger.error("IO错误: {}", e.getMessage());
} finally {
if (null != response) {
try {
EntityUtils.consumeQuietly(response.getEntity());
response.close();
} catch (IOException e) {
logger.error("释放链接错误: {}", e.getMessage());
}
}
}
return msg;
}
/**
* 拼装url
* url ,参数map
*/
public static void setUrlWithParams(URIBuilder uriBuilder,String path, Map<String, String> params) {
try {
uriBuilder.setPath(path);
for (Map.Entry<String, String> kv : params.entrySet()) {
uriBuilder.setParameter(kv.getKey(),kv.getValue());
}
} catch (Exception e) {
logger.error("拼接url出错,uri : {}, path : {},参数: {}",uriBuilder.toString(),path,params);
}
}
public static void getEncryptpwd(){
}
}

View File

@@ -2,7 +2,7 @@
stream.execution.environment.parallelism=1
#flink任务名一般不变
stream.execution.job.name=dos-detection-job
stream.execution.job.name=DOS-DETECTION-APPLICATION
#输入kafka并行度大小
kafka.input.parallelism=1
@@ -82,3 +82,35 @@ baseline.sessions.warning.threshold=0.5
baseline.sessions.major.threshold=1
baseline.sessions.severe.threshold=3
baseline.sessions.critical.threshold=8
#bifang服务访问地址
bifang.server.uri=http://192.168.44.3:80
#访问bifang服务用户名密码
bifang.server.user=admin
bifang.server.password=admin
#加密密码路径信息
bifang.server.encryptpwd.path=/v1/user/encryptpwd
#登录bifang服务路径信息
bifang.server.login.path=/v1/user/login
#获取静态阈值路径信息
bifang.server.policy.threshold.path=/v1/policy/profile/DoS/detection/threshold
#http请求相关参数
#最大连接数
http.pool.max.connection=400
#单路由最大连接数
http.pool.max.per.route=80
#向服务端请求超时时间设置(单位:毫秒)
http.pool.request.timeout=60000
#向服务端连接超时时间设置(单位:毫秒)
http.pool.connect.timeout=60000
#服务端响应超时时间设置(单位:毫秒)
http.pool.response.timeout=60000

View File

@@ -0,0 +1,55 @@
package com.zdjizhi.common;
import com.zdjizhi.utils.HttpClientUtils;
import com.zdjizhi.utils.JsonMapper;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
public class HttpTest {
public static void main(String[] args) throws Exception {
CloseableHttpClient httpclient = HttpClients.custom().build();
URIBuilder uriBuilder = new URIBuilder("http://192.168.44.3:80");
HashMap<String, String> parms = new HashMap<>();
parms.put("password",CommonConfig.BIFANG_SERVER_PASSWORD);
HttpClientUtils.setUrlWithParams(uriBuilder,CommonConfig.BIFANG_SERVER_ENCRYPTPWD_PATH,parms);
System.out.println(uriBuilder.toString());
URI uri = uriBuilder.build();
System.out.println(HttpClientUtils.httpGet(uri.toString()));
/*
URI uri = uriBuilder
.setPath("/v1/user/encryptpwd")
.setParameter("password", "admin").build();
System.out.println(uri.toString());
HttpGet httpGet = new HttpGet(uri);
CloseableHttpResponse response = null;
try {
// 执行http get请求
response = httpclient.execute(httpGet);
// 判断返回状态是否为200
if (response.getStatusLine().getStatusCode() == 200) {
String content = EntityUtils.toString(response.getEntity(), "UTF-8");
System.out.println(content);
HashMap<String, Object> map = (HashMap<String, Object>) JsonMapper.fromJsonString(content, Object.class);
}
} finally {
if (response != null) {
response.close();
}
httpclient.close();
}
*/
}
}