diff --git a/src/main/java/com/zdjizhi/common/CommonConfig.java b/src/main/java/com/zdjizhi/common/CommonConfig.java index fe684b9..2ff146d 100644 --- a/src/main/java/com/zdjizhi/common/CommonConfig.java +++ b/src/main/java/com/zdjizhi/common/CommonConfig.java @@ -29,7 +29,7 @@ public class CommonConfig { public static final int HBASE_BASELINE_TOTAL_NUM = CommonConfigurations.getIntProperty("hbase.baseline.total.num"); public static final int FLINK_FIRST_AGG_PARALLELISM = CommonConfigurations.getIntProperty("flink.first.agg.parallelism"); - public static final int FLINK_SECOND_AGG_PARALLELISM = CommonConfigurations.getIntProperty("flink.second.agg.parallelism"); + public static final int FLINK_DETECTION_MAP_PARALLELISM = CommonConfigurations.getIntProperty("flink.detection.map.parallelism"); public static final int FLINK_WATERMARK_MAX_ORDERNESS = CommonConfigurations.getIntProperty("flink.watermark.max.orderness"); public static final int FLINK_WINDOW_MAX_TIME = CommonConfigurations.getIntProperty("flink.window.max.time"); diff --git a/src/main/java/com/zdjizhi/common/DosMetricsLog.java b/src/main/java/com/zdjizhi/common/DosMetricsLog.java index ac5050b..6f9336a 100644 --- a/src/main/java/com/zdjizhi/common/DosMetricsLog.java +++ b/src/main/java/com/zdjizhi/common/DosMetricsLog.java @@ -5,8 +5,6 @@ import java.io.Serializable; public class DosMetricsLog implements Serializable { private long sketch_start_time; - private String common_sled_ip; - private String common_data_center; private String attack_type; private String destination_ip; private long session_rate; @@ -30,22 +28,6 @@ public class DosMetricsLog implements Serializable { this.sketch_start_time = sketch_start_time; } - public String getCommon_sled_ip() { - return common_sled_ip; - } - - public void setCommon_sled_ip(String common_sled_ip) { - this.common_sled_ip = common_sled_ip; - } - - public String getCommon_data_center() { - return common_data_center; - } - - public void setCommon_data_center(String common_data_center) { - this.common_data_center = common_data_center; - } - public String getAttack_type() { return attack_type; } @@ -90,8 +72,6 @@ public class DosMetricsLog implements Serializable { public String toString() { return "DosMetricsLog{" + "sketch_start_time=" + sketch_start_time + - ", common_sled_ip='" + common_sled_ip + '\'' + - ", common_data_center='" + common_data_center + '\'' + ", attack_type='" + attack_type + '\'' + ", destination_ip='" + destination_ip + '\'' + ", session_rate=" + session_rate + diff --git a/src/main/java/com/zdjizhi/etl/DosDetection.java b/src/main/java/com/zdjizhi/etl/DosDetection.java index b33940a..4f3c7b5 100644 --- a/src/main/java/com/zdjizhi/etl/DosDetection.java +++ b/src/main/java/com/zdjizhi/etl/DosDetection.java @@ -3,47 +3,37 @@ package com.zdjizhi.etl; import com.zdjizhi.common.CommonConfig; import com.zdjizhi.common.DosEventLog; import com.zdjizhi.common.DosSketchLog; +import com.zdjizhi.utils.HbaseUtils; import com.zdjizhi.utils.IpUtils; import com.zdjizhi.utils.SnowflakeId; import org.apache.commons.lang.StringUtils; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.configuration.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.*; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.ArrayWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Writable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; -import java.io.IOException; import java.text.NumberFormat; import java.text.ParseException; import java.util.*; +/** + * @author wlh + */ public class DosDetection extends RichMapFunction { private static final Logger logger = LoggerFactory.getLogger(DosDetection.class); - private Connection conn = null; - private Table table = null; - private Scan scan = null; - private Map>> baselineMap = new HashMap<>(); + private static Map>> baselineMap; private final static int BASELINE_SIZE = 144; private final static NumberFormat PERCENT_INSTANCE = NumberFormat.getPercentInstance(); @Override - public void open(Configuration parameters) throws Exception { - readFromHbase(); + public void open(Configuration parameters){ + baselineMap = HbaseUtils.baselineMap; PERCENT_INSTANCE.setMinimumFractionDigits(2); } @Override - public DosEventLog map(DosSketchLog value) throws Exception { + public DosEventLog map(DosSketchLog value){ try { String destinationIp = value.getDestination_ip(); String attackType = value.getAttack_type(); @@ -77,61 +67,6 @@ public class DosDetection extends RichMapFunction { return null; } - private void prepareHbaseEnv() throws IOException { - org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create(); - - config.set("hbase.zookeeper.quorum", CommonConfig.HBASE_ZOOKEEPER_QUORUM); - config.set("hbase.client.retries.number", "3"); - config.set("hbase.bulkload.retries.number", "3"); - config.set("zookeeper.recovery.retry", "3"); - config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, CommonConfig.HBASE_CLIENT_OPERATION_TIMEOUT); - config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CommonConfig.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); - - TableName tableName = TableName.valueOf(CommonConfig.HBASE_BASELINE_TABLE_NAME); - conn = ConnectionFactory.createConnection(config); - table = conn.getTable(tableName); - scan = new Scan().setAllowPartialResults(true).setLimit(CommonConfig.HBASE_BASELINE_TOTAL_NUM); - logger.info("连接hbase成功,正在读取baseline数据"); - } - - private void readFromHbase() throws IOException { - prepareHbaseEnv(); - logger.info("开始读取baseline数据"); - ResultScanner rs = table.getScanner(scan); - for (Result result : rs) { - Map> floodTypeMap = new HashMap<>(); - String rowkey = Bytes.toString(result.getRow()); - ArrayList tcp = getArraylist(result,"TCP SYN Flood", "session_num"); - ArrayList udp = getArraylist(result,"UDP Flood", "session_num"); - ArrayList icmp = getArraylist(result,"ICMP Flood", "session_num"); - ArrayList dns = getArraylist(result,"DNS Amplification", "session_num"); - floodTypeMap.put("TCP SYN Flood",tcp); - floodTypeMap.put("UDP Flood",udp); - floodTypeMap.put("ICMP Flood",icmp); - floodTypeMap.put("DNS Amplification",dns); - baselineMap.put(rowkey,floodTypeMap); - } - logger.info("格式化baseline数据成功,读取IP共:{}",baselineMap.size()); - } - - private static ArrayList getArraylist(Result result,String family,String qualifier) throws IOException { - if (!result.containsColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier))){ - return null; - } - ArrayWritable w = new ArrayWritable(IntWritable.class); - w.readFields(new DataInputStream(new ByteArrayInputStream(result.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier))))); - return fromWritable(w); - } - - private static ArrayList fromWritable(ArrayWritable writable) { - Writable[] writables = writable.get(); - ArrayList list = new ArrayList<>(writables.length); - for (Writable wrt : writables) { - list.add(((IntWritable)wrt).get()); - } - return list; - } - private DosEventLog getResult(DosSketchLog value, Severity severity, String percent){ DosEventLog dosEventLog = new DosEventLog(); dosEventLog.setLog_id(SnowflakeId.generateId()); diff --git a/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java b/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java index 7818ced..f43a420 100644 --- a/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java @@ -3,7 +3,7 @@ package com.zdjizhi.etl; import com.zdjizhi.common.CommonConfig; import com.zdjizhi.common.DosSketchLog; import org.apache.commons.lang.StringUtils; -import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple6; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; @@ -18,11 +18,11 @@ import static com.zdjizhi.sink.OutputStreamSink.outputTag; /** * @author 94976 */ -public class EtlProcessFunction extends ProcessWindowFunction, TimeWindow> { +public class EtlProcessFunction extends ProcessWindowFunction, TimeWindow> { private static final Logger logger = LoggerFactory.getLogger(EtlProcessFunction.class); @Override - public void process(Tuple4 keys, + public void process(Tuple2 keys, Context context, Iterable elements, Collector out) { DosSketchLog middleResult = getMiddleResult(keys, elements); @@ -37,16 +37,14 @@ public class EtlProcessFunction extends ProcessWindowFunction keys,Iterable elements){ + private DosSketchLog getMiddleResult(Tuple2 keys,Iterable elements){ DosSketchLog midResuleLog = new DosSketchLog(); Tuple6 values = sketchAggregate(elements); try { if (values != null){ - midResuleLog.setCommon_sled_ip(keys.f0); - midResuleLog.setCommon_data_center(keys.f1); - midResuleLog.setDestination_ip(keys.f3); - midResuleLog.setAttack_type(keys.f2); + midResuleLog.setAttack_type(keys.f0); + midResuleLog.setDestination_ip(keys.f1); midResuleLog.setSketch_start_time(values.f4); midResuleLog.setSketch_duration(values.f5); midResuleLog.setSource_ip(values.f3); diff --git a/src/main/java/com/zdjizhi/etl/ParseSketchLog.java b/src/main/java/com/zdjizhi/etl/ParseSketchLog.java index 18b4a78..8560aa2 100644 --- a/src/main/java/com/zdjizhi/etl/ParseSketchLog.java +++ b/src/main/java/com/zdjizhi/etl/ParseSketchLog.java @@ -17,6 +17,9 @@ import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; +/** + * @author wlh + */ public class ParseSketchLog { private static Logger logger = LoggerFactory.getLogger(ParseSketchLog.class); @@ -37,20 +40,16 @@ public class ParseSketchLog { private static class FlatSketchLog implements FlatMapFunction { @Override - public void flatMap(String s, Collector collector) throws Exception { + public void flatMap(String s, Collector collector) { try { if (StringUtil.isNotBlank(s)){ HashMap sketchSource = (HashMap) JsonMapper.fromJsonString(s, Object.class); - String commonSledIp = sketchSource.get("common_sled_ip").toString(); - String commonDataCenter = sketchSource.get("common_data_center").toString(); long sketchStartTime = Long.parseLong(sketchSource.get("sketch_start_time").toString()); long sketchDuration = Long.parseLong(sketchSource.get("sketch_duration").toString()); String attackType = sketchSource.get("attack_type").toString(); ArrayList> reportIpList = (ArrayList>) sketchSource.get("report_ip_list"); for (HashMap obj : reportIpList) { DosSketchLog dosSketchLog = new DosSketchLog(); - dosSketchLog.setCommon_sled_ip(commonSledIp); - dosSketchLog.setCommon_data_center(commonDataCenter); dosSketchLog.setSketch_start_time(sketchStartTime); dosSketchLog.setSketch_duration(sketchDuration); dosSketchLog.setAttack_type(attackType); diff --git a/src/main/java/com/zdjizhi/etl/TrafficServerIpMetrics.java b/src/main/java/com/zdjizhi/etl/TrafficServerIpMetrics.java index 13fd24a..2fe8524 100644 --- a/src/main/java/com/zdjizhi/etl/TrafficServerIpMetrics.java +++ b/src/main/java/com/zdjizhi/etl/TrafficServerIpMetrics.java @@ -14,8 +14,6 @@ class TrafficServerIpMetrics { static DosMetricsLog getOutputMetric(DosSketchLog midResuleLog) { DosMetricsLog dosMetricsLog = new DosMetricsLog(); dosMetricsLog.setSketch_start_time(timeFloor(System.currentTimeMillis()/1000)); - dosMetricsLog.setCommon_sled_ip(midResuleLog.getCommon_sled_ip()); - dosMetricsLog.setCommon_data_center(midResuleLog.getCommon_data_center()); dosMetricsLog.setDestination_ip(midResuleLog.getDestination_ip()); dosMetricsLog.setAttack_type(midResuleLog.getAttack_type()); dosMetricsLog.setSession_rate(midResuleLog.getSketch_sessions()); @@ -35,7 +33,7 @@ class TrafficServerIpMetrics { } public static void main(String[] args) { -// System.out.println(getPartitionNumByIp("146.177.223.43")); + System.out.println(getPartitionNumByIp("146.177.223.43")); System.out.println("146.177.223.43".hashCode()); } diff --git a/src/main/java/com/zdjizhi/main/DosDetectionApplication.java b/src/main/java/com/zdjizhi/main/DosDetectionApplication.java index 7cac197..e78d462 100644 --- a/src/main/java/com/zdjizhi/main/DosDetectionApplication.java +++ b/src/main/java/com/zdjizhi/main/DosDetectionApplication.java @@ -2,6 +2,10 @@ package com.zdjizhi.main; import com.zdjizhi.sink.OutputStreamSink; +/** + * @author wlh + * 程序主类入口 + */ public class DosDetectionApplication { public static void main(String[] args) { diff --git a/src/main/java/com/zdjizhi/sink/OutputStreamSink.java b/src/main/java/com/zdjizhi/sink/OutputStreamSink.java index f66008b..75e0708 100644 --- a/src/main/java/com/zdjizhi/sink/OutputStreamSink.java +++ b/src/main/java/com/zdjizhi/sink/OutputStreamSink.java @@ -8,11 +8,8 @@ import com.zdjizhi.etl.DosDetection; import com.zdjizhi.etl.EtlProcessFunction; import com.zdjizhi.etl.ParseSketchLog; import com.zdjizhi.utils.FlinkEnvironmentUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.datastream.*; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; @@ -20,8 +17,6 @@ import org.apache.flink.util.OutputTag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; - /** * @author 94976 */ @@ -43,11 +38,7 @@ public class OutputStreamSink { } private static SingleOutputStreamOperator getOutputSinkStream(SingleOutputStreamOperator middleStream){ - - return middleStream.keyBy(new SecondKeySelector()) - .reduce(new SecondReduceFunc()) - .map(new DosDetection()) - .setParallelism(CommonConfig.FLINK_SECOND_AGG_PARALLELISM); + return middleStream.map(new DosDetection()).setParallelism(CommonConfig.FLINK_DETECTION_MAP_PARALLELISM); } private static SingleOutputStreamOperator getMiddleStream(){ @@ -58,56 +49,13 @@ public class OutputStreamSink { .setParallelism(CommonConfig.FLINK_FIRST_AGG_PARALLELISM); } - private static String groupUniqSourceIp(String sourceIp1,String sourceIp2){ - HashSet sourceIpSet = new HashSet<>(); - Collections.addAll(sourceIpSet, (sourceIp1 + "," + sourceIp2).split(",")); - if (sourceIpSet.size() > CommonConfig.SOURCE_IP_LIST_LIMIT){ - return StringUtils.join(takeUniqLimit(sourceIpSet,CommonConfig.SOURCE_IP_LIST_LIMIT),","); - } - return StringUtils.join(sourceIpSet,","); - } - - private static Collection takeUniqLimit(Collection collection, int limit){ - int i =0; - Collection newSet = new HashSet<>(); - for (T t:collection){ - if (i < limit){ - newSet.add(t); - i += 1; - } - } - return newSet; - } - - private static class FirstKeySelector implements KeySelector>{ + private static class FirstKeySelector implements KeySelector>{ @Override - public Tuple4 getKey(DosSketchLog dosSketchLog) throws Exception { - return Tuple4.of( - dosSketchLog.getCommon_sled_ip(), - dosSketchLog.getCommon_data_center(), - dosSketchLog.getAttack_type(), - dosSketchLog.getDestination_ip()); - } - } - - private static class SecondKeySelector implements KeySelector> { - @Override - public Tuple2 getKey(DosSketchLog dosSketchLog) throws Exception { + public Tuple2 getKey(DosSketchLog dosSketchLog){ return Tuple2.of( dosSketchLog.getAttack_type(), dosSketchLog.getDestination_ip()); } } - private static class SecondReduceFunc implements ReduceFunction { - @Override - public DosSketchLog reduce(DosSketchLog value1, DosSketchLog value2) throws Exception { - value1.setSketch_sessions((value1.getSketch_sessions()+value2.getSketch_sessions())/2); - value1.setSketch_bytes((value1.getSketch_bytes()+value2.getSketch_bytes())/2); - value1.setSketch_packets((value1.getSketch_packets()+value2.getSketch_packets())/2); - value1.setSource_ip(groupUniqSourceIp(value1.getSource_ip(),value2.getSource_ip())); - return value1; - } - } - } diff --git a/src/main/java/com/zdjizhi/source/DosSketchSource.java b/src/main/java/com/zdjizhi/source/DosSketchSource.java index f33f036..fc86c87 100644 --- a/src/main/java/com/zdjizhi/source/DosSketchSource.java +++ b/src/main/java/com/zdjizhi/source/DosSketchSource.java @@ -9,6 +9,9 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.Properties; +/** + * @author wlh + */ public class DosSketchSource { private static StreamExecutionEnvironment streamExeEnv = FlinkEnvironmentUtils.streamExeEnv; diff --git a/src/main/java/com/zdjizhi/utils/CollectionUtils.java b/src/main/java/com/zdjizhi/utils/CollectionUtils.java new file mode 100644 index 0000000..69d4592 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/CollectionUtils.java @@ -0,0 +1,23 @@ +package com.zdjizhi.utils; + +import java.util.Collection; +import java.util.HashSet; + +/** + * @author wlh + * 扩展集合处理工具 + */ +public class CollectionUtils { + + public static Collection takeUniqueLimit(Collection collection, int limit){ + int i =0; + Collection newSet = new HashSet<>(); + for (T t:collection){ + if (i < limit){ + newSet.add(t); + i += 1; + } + } + return newSet; + } +} diff --git a/src/main/java/com/zdjizhi/utils/HbaseUtils.java b/src/main/java/com/zdjizhi/utils/HbaseUtils.java index a6ef6c0..87921bd 100644 --- a/src/main/java/com/zdjizhi/utils/HbaseUtils.java +++ b/src/main/java/com/zdjizhi/utils/HbaseUtils.java @@ -1,5 +1,108 @@ package com.zdjizhi.utils; +import com.zdjizhi.common.CommonConfig; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Writable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.util.*; + +/** + * @author wlh + */ public class HbaseUtils { + private static final Logger logger = LoggerFactory.getLogger(HbaseUtils.class); + private static Table table = null; + private static Scan scan = null; + public static Map>> baselineMap = new HashMap<>(); + + static { + readFromHbase(); + } + + private static void prepareHbaseEnv() throws IOException { + org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create(); + + config.set("hbase.zookeeper.quorum", CommonConfig.HBASE_ZOOKEEPER_QUORUM); + config.set("hbase.client.retries.number", "3"); + config.set("hbase.bulkload.retries.number", "3"); + config.set("zookeeper.recovery.retry", "3"); + config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, CommonConfig.HBASE_CLIENT_OPERATION_TIMEOUT); + config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CommonConfig.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); + + TableName tableName = TableName.valueOf(CommonConfig.HBASE_BASELINE_TABLE_NAME); + Connection conn = ConnectionFactory.createConnection(config); + table = conn.getTable(tableName); + scan = new Scan().setAllowPartialResults(true).setLimit(CommonConfig.HBASE_BASELINE_TOTAL_NUM); + logger.info("连接hbase成功,正在读取baseline数据"); + } + + public static void main(String[] args) { + Set keySet = baselineMap.keySet(); + for (String key:keySet){ + Map> stringListMap = baselineMap.get(key); + Set typeSet = stringListMap.keySet(); + for (String type:typeSet){ + List lines = stringListMap.get(type); + if (lines != null){ + System.out.println(key+"--"+type+"--"+Arrays.toString(lines.toArray())); + } + } + } + System.out.println(baselineMap.size()); + } + + private static void readFromHbase(){ + try { + prepareHbaseEnv(); + logger.info("开始读取baseline数据"); + ResultScanner rs = table.getScanner(scan); + for (Result result : rs) { + Map> floodTypeMap = new HashMap<>(); + String rowkey = Bytes.toString(result.getRow()); + ArrayList tcp = getArraylist(result,"TCP SYN Flood", "session_rate"); + ArrayList udp = getArraylist(result,"UDP Flood", "session_rate"); + ArrayList icmp = getArraylist(result,"ICMP Flood", "session_rate"); + ArrayList dns = getArraylist(result,"DNS Amplification", "session_rate"); + floodTypeMap.put("TCP SYN Flood",tcp); + floodTypeMap.put("UDP Flood",udp); + floodTypeMap.put("ICMP Flood",icmp); + floodTypeMap.put("DNS Amplification",dns); + baselineMap.put(rowkey,floodTypeMap); + } + logger.info("格式化baseline数据成功,读取IP共:{}",baselineMap.size()); + }catch (Exception e){ + logger.error("读取hbase数据失败",e); + } + } + + private static ArrayList getArraylist(Result result,String family,String qualifier) throws IOException { + if (!result.containsColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier))){ + return null; + } + ArrayWritable w = new ArrayWritable(IntWritable.class); + w.readFields(new DataInputStream(new ByteArrayInputStream(result.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier))))); + return fromWritable(w); + } + + private static ArrayList fromWritable(ArrayWritable writable) { + Writable[] writables = writable.get(); + ArrayList list = new ArrayList<>(writables.length); + for (Writable wrt : writables) { + list.add(((IntWritable)wrt).get()); + } + return list; + } + } diff --git a/src/main/java/com/zdjizhi/utils/IpUtils.java b/src/main/java/com/zdjizhi/utils/IpUtils.java index 1468c04..7a7aae6 100644 --- a/src/main/java/com/zdjizhi/utils/IpUtils.java +++ b/src/main/java/com/zdjizhi/utils/IpUtils.java @@ -15,7 +15,13 @@ public class IpUtils { .build(); public static void main(String[] args) { - System.out.println(ipLookup.countryLookup("61.128.159.186")); + System.out.println(ipLookup.countryLookup("94.23.23.52")); + +// String ips = "192.168.50.23,192.168.50.45,192.168.56.9,192.168.56.8,192.168.50.58,192.168.56.7,192.168.56.6,192.168.50.40,192.168.50.19,192.168.50.6,192.168.50.4,192.168.56.17,192.168.50.27,192.168.50.26,192.168.50.18,192.168.56.3,192.168.56.10"; +// for (String ip:ips.split(",")){ +// System.out.println(ip+"--"+ipLookup.countryLookup(ip)); +// } + } diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties index a9d8315..4a96a93 100644 --- a/src/main/resources/common.properties +++ b/src/main/resources/common.properties @@ -11,28 +11,34 @@ kafka.input.parallelism=1 kafka.input.topic.name=DOS-SKETCH-LOG #输入kafka地址 -kafka.input.bootstrap.servers=192.168.44.12:9092 +#kafka.input.bootstrap.servers=192.168.44.12:9092 +kafka.input.bootstrap.servers=192.168.44.11:9092,192.168.44.14:9092,192.168.44.15:9092 #读取kafka group id -kafka.input.group.id=2108041755 +kafka.input.group.id=2108161121 +#kafka.input.group.id=dos-detection-job-210813-1 #发送kafka metrics并行度大小 kafka.output.metric.parallelism=1 #发送kafka metrics topic名 -kafka.output.metric.topic.name=TRAFFIC-TOP-DESTINATION-IP-METRICS-LOG +#kafka.output.metric.topic.name=TRAFFIC-TOP-DESTINATION-IP-METRICS-LOG +kafka.output.metric.topic.name=test #发送kafka event并行度大小 kafka.output.event.parallelism=1 #发送kafka event topic名 -kafka.output.event.topic.name=DOS-EVENT-LOG +#kafka.output.event.topic.name=DOS-EVENT-LOG +kafka.output.event.topic.name=test #kafka输出地址 kafka.output.bootstrap.servers=192.168.44.12:9092 +#kafka.output.bootstrap.servers=192.168.44.11:9092,192.168.44.14:9092,192.168.44.15:9092 #zookeeper地址 -hbase.zookeeper.quorum=192.168.44.12:2181 +#hbase.zookeeper.quorum=192.168.44.12:2181 +hbase.zookeeper.quorum=192.168.44.11:2181,192.168.44.14:2181,192.168.44.15:2181 #hbase客户端处理时间 hbase.client.operation.timeout=30000 @@ -44,11 +50,11 @@ hbase.baseline.table.name=ddos_traffic_baselines #读取baseline限制 hbase.baseline.total.num=1000000 -#设置首次聚合并行度,4个key +#设置聚合并行度,2个key flink.first.agg.parallelism=1 -#设置二次聚合并行度,2个key -flink.second.agg.parallelism=1 +#设置结果判定并行度 +flink.detection.map.parallelism=1 #watermark延迟 flink.watermark.max.orderness=1 @@ -65,7 +71,9 @@ destination.ip.partition.num=10000 data.center.id.num=15 #IP mmdb库路径 -ip.mmdb.path=D:\\data\\dat_test\\ +#ip.mmdb.path=D:\\data\\dat_test\\ +ip.mmdb.path=D:\\data\\ +#ip.mmdb.path=/home/bigdata/topology/dat/ #ip.mmdb.path=/home/bigdata/wlh/topology/dos-detection/dat/ #基于baseline判定dos攻击的上下限