diff --git a/pom.xml b/pom.xml index d44c210..7697087 100644 --- a/pom.xml +++ b/pom.xml @@ -197,6 +197,11 @@ + + org.apache.httpcomponents + httpclient + 4.5.6 + org.apache.flink diff --git a/src/main/java/com/zdjizhi/common/CommonConfig.java b/src/main/java/com/zdjizhi/common/CommonConfig.java index 2ff146d..b0b1f97 100644 --- a/src/main/java/com/zdjizhi/common/CommonConfig.java +++ b/src/main/java/com/zdjizhi/common/CommonConfig.java @@ -45,4 +45,17 @@ public class CommonConfig { public static final double BASELINE_SESSIONS_SEVERE_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.severe.threshold"); public static final double BASELINE_SESSIONS_CRITICAL_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.critical.threshold"); + public static final String BIFANG_SERVER_URI = CommonConfigurations.getStringProperty("bifang.server.uri"); + public static final String BIFANG_SERVER_USER = CommonConfigurations.getStringProperty("bifang.server.user"); + public static final String BIFANG_SERVER_PASSWORD = CommonConfigurations.getStringProperty("bifang.server.password"); + public static final String BIFANG_SERVER_ENCRYPTPWD_PATH = CommonConfigurations.getStringProperty("bifang.server.encryptpwd.path"); + public static final String BIFANG_SERVER_LOGIN_PATH = CommonConfigurations.getStringProperty("bifang.server.login.path"); + public static final String BIFANG_SERVER_POLICY_THRESHOLD_PATH = CommonConfigurations.getStringProperty("bifang.server.policy.threshold.path"); + + public static final int HTTP_POOL_MAX_CONNECTION = CommonConfigurations.getIntProperty("http.pool.max.connection"); + public static final int HTTP_POOL_MAX_PER_ROUTE = CommonConfigurations.getIntProperty("http.pool.max.per.route"); + public static final int HTTP_POOL_REQUEST_TIMEOUT = CommonConfigurations.getIntProperty("http.pool.request.timeout"); + public static final int HTTP_POOL_CONNECT_TIMEOUT = CommonConfigurations.getIntProperty("http.pool.connect.timeout"); + public static final int HTTP_POOL_RESPONSE_TIMEOUT = CommonConfigurations.getIntProperty("http.pool.response.timeout"); + } diff --git a/src/main/java/com/zdjizhi/etl/DosDetection.java b/src/main/java/com/zdjizhi/etl/DosDetection.java index 453b72c..9e06b09 100644 --- a/src/main/java/com/zdjizhi/etl/DosDetection.java +++ b/src/main/java/com/zdjizhi/etl/DosDetection.java @@ -8,6 +8,7 @@ import com.zdjizhi.utils.IpUtils; import com.zdjizhi.utils.SnowflakeId; import org.apache.commons.lang.StringUtils; import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,7 +23,7 @@ import java.util.*; public class DosDetection extends RichMapFunction { private static final Logger logger = LoggerFactory.getLogger(DosDetection.class); - private static Map>> baselineMap; + private static Map, Integer>>> baselineMap; private final static int BASELINE_SIZE = 144; private final static NumberFormat PERCENT_INSTANCE = NumberFormat.getPercentInstance(); @@ -39,12 +40,12 @@ public class DosDetection extends RichMapFunction { String attackType = value.getAttack_type(); logger.debug("当前判断IP:{}, 类型: {}",destinationIp,attackType); if (baselineMap.containsKey(destinationIp)){ - List baseline = baselineMap.get(destinationIp).get(attackType); - if (baseline != null && baseline.size() == BASELINE_SIZE){ - int timeIndex = getCurrentTimeIndex(value.getSketch_start_time()); - Integer base = baseline.get(timeIndex); + Tuple2, Integer> floodTypeTup = baselineMap.get(destinationIp).get(attackType); + List baselines = floodTypeTup.f0; + if (baselines != null && baselines.size() == BASELINE_SIZE){ + Integer base = getBaseValue(baselines,value,floodTypeTup.f1); long diff = value.getSketch_sessions() - base; - if (diff > 0){ + if (diff > 0 && base != 0){ String percent = getDiffPercent(diff, base); double diffPercentDouble = getDiffPercentDouble(percent); Severity severity = judgeSeverity(diffPercentDouble); @@ -85,6 +86,21 @@ public class DosDetection extends RichMapFunction { return dosEventLog; } + private Integer getBaseValue(List baselines,DosSketchLog value,int defauleVaule){ + try { + int timeIndex = getCurrentTimeIndex(value.getSketch_start_time()); + Integer base = baselines.get(timeIndex); + if (base == 0){ + logger.debug("获取到当前IP: {},类型: {} baseline值为0,替换为P95观测值{}",value.getDestination_ip(),value.getAttack_type(),defauleVaule); + base = defauleVaule; + } + return base; + }catch (Exception e){ + logger.error("解析baseline数据失败,返回默认值0",e); + return 0; + } + } + private String getConditions(String percent){ return "sessions > "+percent+" of baseline"; } diff --git a/src/main/java/com/zdjizhi/utils/HbaseUtils.java b/src/main/java/com/zdjizhi/utils/HbaseUtils.java index 87921bd..5e91b5a 100644 --- a/src/main/java/com/zdjizhi/utils/HbaseUtils.java +++ b/src/main/java/com/zdjizhi/utils/HbaseUtils.java @@ -1,6 +1,7 @@ package com.zdjizhi.utils; import com.zdjizhi.common.CommonConfig; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; @@ -22,11 +23,16 @@ import java.util.*; */ public class HbaseUtils { private static final Logger logger = LoggerFactory.getLogger(HbaseUtils.class); - private static Table table = null; + private static Table table = null; private static Scan scan = null; - public static Map>> baselineMap = new HashMap<>(); + public static Map, Integer>>> baselineMap = new HashMap<>(); + private static ArrayList floodTypeList = new ArrayList<>(); static { + floodTypeList.add("TCP SYN Flood"); + floodTypeList.add("UDP Flood"); + floodTypeList.add("ICMP Flood"); + floodTypeList.add("DNS Amplification"); readFromHbase(); } @@ -40,7 +46,7 @@ public class HbaseUtils { config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, CommonConfig.HBASE_CLIENT_OPERATION_TIMEOUT); config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CommonConfig.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); - TableName tableName = TableName.valueOf(CommonConfig.HBASE_BASELINE_TABLE_NAME); + TableName tableName = TableName.valueOf(CommonConfig.HBASE_BASELINE_TABLE_NAME); Connection conn = ConnectionFactory.createConnection(config); table = conn.getTable(tableName); scan = new Scan().setAllowPartialResults(true).setLimit(CommonConfig.HBASE_BASELINE_TOTAL_NUM); @@ -49,60 +55,61 @@ public class HbaseUtils { public static void main(String[] args) { Set keySet = baselineMap.keySet(); - for (String key:keySet){ - Map> stringListMap = baselineMap.get(key); - Set typeSet = stringListMap.keySet(); - for (String type:typeSet){ - List lines = stringListMap.get(type); - if (lines != null){ - System.out.println(key+"--"+type+"--"+Arrays.toString(lines.toArray())); - } - } + for (String key : keySet) { + } System.out.println(baselineMap.size()); } - private static void readFromHbase(){ + private static void readFromHbase() { try { prepareHbaseEnv(); logger.info("开始读取baseline数据"); ResultScanner rs = table.getScanner(scan); for (Result result : rs) { - Map> floodTypeMap = new HashMap<>(); + Map, Integer>> floodTypeMap = new HashMap<>(); String rowkey = Bytes.toString(result.getRow()); - ArrayList tcp = getArraylist(result,"TCP SYN Flood", "session_rate"); - ArrayList udp = getArraylist(result,"UDP Flood", "session_rate"); - ArrayList icmp = getArraylist(result,"ICMP Flood", "session_rate"); - ArrayList dns = getArraylist(result,"DNS Amplification", "session_rate"); - floodTypeMap.put("TCP SYN Flood",tcp); - floodTypeMap.put("UDP Flood",udp); - floodTypeMap.put("ICMP Flood",icmp); - floodTypeMap.put("DNS Amplification",dns); - baselineMap.put(rowkey,floodTypeMap); + for (String type:floodTypeList){ + ArrayList sessionRate = getArraylist(result, type, "session_rate"); + Integer defaultValue = getDefaultValue(result, type, "session_rate_default_value"); + floodTypeMap.put(type,Tuple2.of(sessionRate, defaultValue)); + } + baselineMap.put(rowkey, floodTypeMap); } - logger.info("格式化baseline数据成功,读取IP共:{}",baselineMap.size()); - }catch (Exception e){ - logger.error("读取hbase数据失败",e); + logger.info("格式化baseline数据成功,读取IP共:{}", baselineMap.size()); + } catch (Exception e) { + logger.error("读取hbase数据失败", e); } } - private static ArrayList getArraylist(Result result,String family,String qualifier) throws IOException { - if (!result.containsColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier))){ - return null; + private static Integer getDefaultValue(Result result, String family, String qualifier) { + byte[] value = result.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier)); + if (value != null){ + return Bytes.toInt(value); } - ArrayWritable w = new ArrayWritable(IntWritable.class); - w.readFields(new DataInputStream(new ByteArrayInputStream(result.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier))))); - return fromWritable(w); + return 1; + } + + private static ArrayList getArraylist(Result result, String family, String qualifier) throws IOException { + if (containsColumn(result, family, qualifier)) { + ArrayWritable w = new ArrayWritable(IntWritable.class); + w.readFields(new DataInputStream(new ByteArrayInputStream(result.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier))))); + return fromWritable(w); + } + return null; } private static ArrayList fromWritable(ArrayWritable writable) { Writable[] writables = writable.get(); ArrayList list = new ArrayList<>(writables.length); for (Writable wrt : writables) { - list.add(((IntWritable)wrt).get()); + list.add(((IntWritable) wrt).get()); } return list; } + private static boolean containsColumn(Result result, String family, String qualifier) { + return result.containsColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier)); + } } diff --git a/src/main/java/com/zdjizhi/utils/HttpClientUtils.java b/src/main/java/com/zdjizhi/utils/HttpClientUtils.java new file mode 100644 index 0000000..0e68fbf --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/HttpClientUtils.java @@ -0,0 +1,284 @@ +package com.zdjizhi.utils; + +import com.zdjizhi.common.CommonConfig; +import org.apache.http.*; +import org.apache.http.client.ClientProtocolException; +import org.apache.http.client.HttpRequestRetryHandler; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.*; +import org.apache.http.client.protocol.HttpClientContext; +import org.apache.http.client.utils.URIBuilder; +import org.apache.http.conn.ConnectTimeoutException; +import org.apache.http.conn.ConnectionKeepAliveStrategy; +import org.apache.http.conn.HttpHostConnectException; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import org.apache.http.message.BasicHeaderElementIterator; +import org.apache.http.protocol.HTTP; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLException; +import javax.net.ssl.SSLHandshakeException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; +import java.util.Map; + +/** + * @author wlh + * http client工具类 + */ +public class HttpClientUtils { + /** 全局连接池对象 */ + private static final PoolingHttpClientConnectionManager CONN_MANAGER = new PoolingHttpClientConnectionManager(); + + private static Logger logger = LoggerFactory.getLogger(HttpClientUtils.class); + + /* + * 静态代码块配置连接池信息 + */ + static { + + // 设置最大连接数 + CONN_MANAGER.setMaxTotal(CommonConfig.HTTP_POOL_MAX_CONNECTION); + // 设置每个连接的路由数 + CONN_MANAGER.setDefaultMaxPerRoute(CommonConfig.HTTP_POOL_MAX_PER_ROUTE); + + } + + /** + * 获取Http客户端连接对象 + * @return Http客户端连接对象 + */ + private static CloseableHttpClient getHttpClient() { + // 创建Http请求配置参数 + RequestConfig requestConfig = RequestConfig.custom() + // 获取连接超时时间 + .setConnectionRequestTimeout(CommonConfig.HTTP_POOL_REQUEST_TIMEOUT) + // 请求超时时间 + .setConnectTimeout(CommonConfig.HTTP_POOL_CONNECT_TIMEOUT) + // 响应超时时间 + .setSocketTimeout(CommonConfig.HTTP_POOL_RESPONSE_TIMEOUT) + .build(); + + /* + * 测出超时重试机制为了防止超时不生效而设置 + * 如果直接放回false,不重试 + * 这里会根据情况进行判断是否重试 + */ + HttpRequestRetryHandler retry = (exception, executionCount, context) -> { + if (executionCount >= 3) {// 如果已经重试了3次,就放弃 + return false; + } + if (exception instanceof NoHttpResponseException) {// 如果服务器丢掉了连接,那么就重试 + return true; + } + if (exception instanceof SSLHandshakeException) {// 不要重试SSL握手异常 + return false; + } + if (exception instanceof UnknownHostException) {// 目标服务器不可达 + return false; + } + if (exception instanceof ConnectTimeoutException) {// 连接被拒绝 + return false; + } + if (exception instanceof HttpHostConnectException) {// 连接被拒绝 + return false; + } + if (exception instanceof SSLException) {// ssl握手异常 + return false; + } + if (exception instanceof InterruptedIOException) {// 超时 + return true; + } + HttpClientContext clientContext = HttpClientContext.adapt(context); + HttpRequest request = clientContext.getRequest(); + // 如果请求是幂等的,就再次尝试 + return !(request instanceof HttpEntityEnclosingRequest); + }; + + + ConnectionKeepAliveStrategy myStrategy = (response, context) -> { + HeaderElementIterator it = new BasicHeaderElementIterator + (response.headerIterator(HTTP.CONN_KEEP_ALIVE)); + while (it.hasNext()) { + HeaderElement he = it.nextElement(); + String param = he.getName(); + String value = he.getValue(); + if (value != null && "timeout".equalsIgnoreCase(param)) { + return Long.parseLong(value) * 1000; + } + } + return 60 * 1000;//如果没有约定,则默认定义时长为60s + }; + + // 创建httpClient + return HttpClients.custom() + // 把请求相关的超时信息设置到连接客户端 + .setDefaultRequestConfig(requestConfig) + // 把请求重试设置到连接客户端 + .setRetryHandler(retry) + .setKeepAliveStrategy(myStrategy) + // 配置连接池管理对象 + .setConnectionManager(CONN_MANAGER) + .build(); + } + + + /** + * GET请求 + * + * @param url 请求地 + * @return message + */ + public static String httpGet(String url, Header... headers) { + String msg = "-1"; + + // 获取客户端连接对象 + CloseableHttpClient httpClient = getHttpClient(); + CloseableHttpResponse response = null; + + try { + + URL ul = new URL(url); + + URI uri = new URI(ul.getProtocol(),null, ul.getHost(), ul.getPort(), ul.getPath(), ul.getQuery(), null); + logger.info("http get uri {}",uri); + // 创建GET请求对象 + HttpGet httpGet = new HttpGet(uri); + + if (StringUtil.isNotEmpty(headers)) { + for (Header h : headers) { + httpGet.addHeader(h); + logger.info("request header : {}",h); + } + } + // 执行请求 + response = httpClient.execute(httpGet); + int statusCode = response.getStatusLine().getStatusCode(); + // 获取响应实体 + HttpEntity entity = response.getEntity(); + // 获取响应信息 + msg = EntityUtils.toString(entity, "UTF-8"); + + if (statusCode != HttpStatus.SC_OK) { + logger.error("Http get content is :{}" , msg); + } + + } catch (URISyntaxException e) { + logger.error("URI 转换错误: {}", e.getMessage()); + } catch (ClientProtocolException e) { + logger.error("协议错误: {}", e.getMessage()); + } catch (ParseException e) { + logger.error("解析错误: {}", e.getMessage()); + } catch (IOException e) { + logger.error("IO错误: {}",e.getMessage()); + } finally { + if (null != response) { + try { + EntityUtils.consume(response.getEntity()); + response.close(); + } catch (IOException e) { + logger.error("释放链接错误: {}", e.getMessage()); + + } + } + } + + return msg; + } + /** + * POST 请求 + * @param url url参数 + * @param requestBody 请求体 + * @return post请求返回结果 + */ + public static String httpPost(String url, String requestBody, Header... headers) { + String msg = "-1"; + // 获取客户端连接对象 + CloseableHttpClient httpClient = getHttpClient(); + + // 创建POST请求对象 + CloseableHttpResponse response = null; + try { + + URL ul = new URL(url); + + URI uri = new URI(ul.getProtocol(),null, ul.getHost(), ul.getPort(), ul.getPath(), ul.getQuery(), null); + + logger.info("http post uri:{}, http post body:{}", uri, requestBody); + + HttpPost httpPost = new HttpPost(uri); + httpPost.setHeader("Content-Type", "application/json"); + if (StringUtil.isNotEmpty(headers)) { + for (Header h : headers) { + httpPost.addHeader(h); + logger.info("request header : {}",h); + } + } + + if(StringUtil.isNotBlank(requestBody)) { + byte[] bytes = requestBody.getBytes(StandardCharsets.UTF_8); + httpPost.setEntity(new ByteArrayEntity(bytes)); + } + + response = httpClient.execute(httpPost); + int statusCode = response.getStatusLine().getStatusCode(); + // 获取响应实体 + HttpEntity entity = response.getEntity(); + // 获取响应信息 + msg = EntityUtils.toString(entity, "UTF-8"); + + if (statusCode != HttpStatus.SC_OK) { + logger.error("Http post content is :{}" , msg); + } + } catch (URISyntaxException e) { + logger.error("URI 转换错误: {}", e.getMessage()); + } catch (ClientProtocolException e) { + logger.error("协议错误: {}", e.getMessage()); + } catch (ParseException e) { + logger.error("解析错误: {}", e.getMessage()); + } catch (IOException e) { + logger.error("IO错误: {}", e.getMessage()); + } finally { + if (null != response) { + try { + EntityUtils.consumeQuietly(response.getEntity()); + response.close(); + } catch (IOException e) { + logger.error("释放链接错误: {}", e.getMessage()); + + } + } + } + return msg; + } + + /** + * 拼装url + * url ,参数map + */ + public static void setUrlWithParams(URIBuilder uriBuilder,String path, Map params) { + try { + uriBuilder.setPath(path); + for (Map.Entry kv : params.entrySet()) { + uriBuilder.setParameter(kv.getKey(),kv.getValue()); + } + } catch (Exception e) { + logger.error("拼接url出错,uri : {}, path : {},参数: {}",uriBuilder.toString(),path,params); + } + } + + public static void getEncryptpwd(){ + + } + +} diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties index 4a96a93..3ff6bd2 100644 --- a/src/main/resources/common.properties +++ b/src/main/resources/common.properties @@ -2,7 +2,7 @@ stream.execution.environment.parallelism=1 #flink任务名,一般不变 -stream.execution.job.name=dos-detection-job +stream.execution.job.name=DOS-DETECTION-APPLICATION #输入kafka并行度大小 kafka.input.parallelism=1 @@ -82,3 +82,35 @@ baseline.sessions.warning.threshold=0.5 baseline.sessions.major.threshold=1 baseline.sessions.severe.threshold=3 baseline.sessions.critical.threshold=8 + +#bifang服务访问地址 +bifang.server.uri=http://192.168.44.3:80 + +#访问bifang服务用户名密码 +bifang.server.user=admin +bifang.server.password=admin + +#加密密码路径信息 +bifang.server.encryptpwd.path=/v1/user/encryptpwd + +#登录bifang服务路径信息 +bifang.server.login.path=/v1/user/login + +#获取静态阈值路径信息 +bifang.server.policy.threshold.path=/v1/policy/profile/DoS/detection/threshold + +#http请求相关参数 +#最大连接数 +http.pool.max.connection=400 + +#单路由最大连接数 +http.pool.max.per.route=80 + +#向服务端请求超时时间设置(单位:毫秒) +http.pool.request.timeout=60000 + +#向服务端连接超时时间设置(单位:毫秒) +http.pool.connect.timeout=60000 + +#服务端响应超时时间设置(单位:毫秒) +http.pool.response.timeout=60000 \ No newline at end of file diff --git a/src/test/java/com/zdjizhi/common/HttpTest.java b/src/test/java/com/zdjizhi/common/HttpTest.java new file mode 100644 index 0000000..1452037 --- /dev/null +++ b/src/test/java/com/zdjizhi/common/HttpTest.java @@ -0,0 +1,55 @@ +package com.zdjizhi.common; + +import com.zdjizhi.utils.HttpClientUtils; +import com.zdjizhi.utils.JsonMapper; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.utils.URIBuilder; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; + +import java.net.URI; +import java.util.HashMap; +import java.util.Map; + +public class HttpTest { + public static void main(String[] args) throws Exception { + CloseableHttpClient httpclient = HttpClients.custom().build(); + URIBuilder uriBuilder = new URIBuilder("http://192.168.44.3:80"); + HashMap parms = new HashMap<>(); + parms.put("password",CommonConfig.BIFANG_SERVER_PASSWORD); + HttpClientUtils.setUrlWithParams(uriBuilder,CommonConfig.BIFANG_SERVER_ENCRYPTPWD_PATH,parms); + System.out.println(uriBuilder.toString()); + + URI uri = uriBuilder.build(); + System.out.println(HttpClientUtils.httpGet(uri.toString())); + + /* + URI uri = uriBuilder + .setPath("/v1/user/encryptpwd") + .setParameter("password", "admin").build(); + + System.out.println(uri.toString()); + + + HttpGet httpGet = new HttpGet(uri); + CloseableHttpResponse response = null; + try { + // 执行http get请求 + response = httpclient.execute(httpGet); + // 判断返回状态是否为200 + if (response.getStatusLine().getStatusCode() == 200) { + String content = EntityUtils.toString(response.getEntity(), "UTF-8"); + System.out.println(content); + HashMap map = (HashMap) JsonMapper.fromJsonString(content, Object.class); + } + } finally { + if (response != null) { + response.close(); + } + httpclient.close(); + } +*/ + } +}