diff --git a/src/main/java/com/zdjizhi/common/CommonConfig.java b/src/main/java/com/zdjizhi/common/CommonConfig.java index 0985a11..63a1977 100644 --- a/src/main/java/com/zdjizhi/common/CommonConfig.java +++ b/src/main/java/com/zdjizhi/common/CommonConfig.java @@ -22,6 +22,7 @@ public class CommonConfig { public static final String KAFKA_OUTPUT_EVENT_TOPIC_NAME = CommonConfigurations.getStringProperty("kafka.output.event.topic.name"); public static final String KAFKA_OUTPUT_BOOTSTRAP_SERVERS = CommonConfigurations.getStringProperty("kafka.output.bootstrap.servers"); + public static final int HBASE_INPUT_PARALLELISM = CommonConfigurations.getIntProperty("hbase.input.parallelism"); public static final String HBASE_ZOOKEEPER_QUORUM = CommonConfigurations.getStringProperty("hbase.zookeeper.quorum"); public static final int HBASE_CLIENT_OPERATION_TIMEOUT = CommonConfigurations.getIntProperty("hbase.client.operation.timeout"); public static final int HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD = CommonConfigurations.getIntProperty("hbase.client.scanner.timeout.period"); @@ -29,6 +30,8 @@ public class CommonConfig { public static final String HBASE_BASELINE_TABLE_NAME = CommonConfigurations.getStringProperty("hbase.baseline.table.name"); public static final int HBASE_BASELINE_TOTAL_NUM = CommonConfigurations.getIntProperty("hbase.baseline.total.num"); + public static final int FLINK_FIRST_AGG_PARALLELISM = CommonConfigurations.getIntProperty("flink.first.agg.parallelism"); + public static final int FLINK_SECOND_AGG_PARALLELISM = CommonConfigurations.getIntProperty("flink.second.agg.parallelism"); public static final int FLINK_WATERMARK_MAX_ORDERNESS = CommonConfigurations.getIntProperty("flink.watermark.max.orderness"); public static final int FLINK_WINDOW_MAX_TIME = CommonConfigurations.getIntProperty("flink.window.max.time"); diff --git a/src/main/java/com/zdjizhi/etl/ParseSketchLog.java b/src/main/java/com/zdjizhi/etl/ParseSketchLog.java index 8330cf5..fe14071 100644 --- a/src/main/java/com/zdjizhi/etl/ParseSketchLog.java +++ b/src/main/java/com/zdjizhi/etl/ParseSketchLog.java @@ -22,13 +22,11 @@ public class ParseSketchLog { private static Logger logger = LoggerFactory.getLogger(ParseSketchLog.class); public static SingleOutputStreamOperator getSketchSource(){ - return flatSketchSource(); + return flatSketchSource().assignTimestampsAndWatermarks(createWatermarkStrategy()); } private static SingleOutputStreamOperator flatSketchSource(){ - return DosSketchSource.createDosSketchSourceByDatastream() - .flatMap(new flatSketchLog()) - .assignTimestampsAndWatermarks(createWatermarkStrategy()); + return DosSketchSource.createDosSketchSource().flatMap(new flatSketchLog()); } private static WatermarkStrategy createWatermarkStrategy(){ diff --git a/src/main/java/com/zdjizhi/sink/OutputStreamSink.java b/src/main/java/com/zdjizhi/sink/OutputStreamSink.java index 953dd56..755d9bf 100644 --- a/src/main/java/com/zdjizhi/sink/OutputStreamSink.java +++ b/src/main/java/com/zdjizhi/sink/OutputStreamSink.java @@ -63,6 +63,7 @@ public class OutputStreamSink { BroadcastStream>>> broadcast = FlinkEnvironmentUtils.streamExeEnv .addSource(new BaselineSource()) + .setParallelism(CommonConfig.HBASE_INPUT_PARALLELISM) .broadcast(descriptor); logger.info("广播变量加载成功!!"); @@ -70,14 +71,16 @@ public class OutputStreamSink { // .window(TumblingEventTimeWindows.of(Time.seconds(CommonConfig.FLINK_WINDOW_MAX_TIME))) .reduce(new SecondReduceFunc()) .connect(broadcast) - .process(new DosDetection()); + .process(new DosDetection()) + .setParallelism(CommonConfig.FLINK_SECOND_AGG_PARALLELISM); } private static SingleOutputStreamOperator getMiddleStream(){ return ParseSketchLog.getSketchSource() .keyBy(new FirstKeySelector()) .window(TumblingEventTimeWindows.of(Time.seconds(CommonConfig.FLINK_WINDOW_MAX_TIME))) - .process(new EtlProcessFunction()); + .process(new EtlProcessFunction()) + .setParallelism(CommonConfig.FLINK_FIRST_AGG_PARALLELISM); } private static String groupUniqSourceIp(String sourceIp1,String sourceIp2){ diff --git a/src/main/java/com/zdjizhi/source/DosSketchSource.java b/src/main/java/com/zdjizhi/source/DosSketchSource.java index 7f080ba..f33f036 100644 --- a/src/main/java/com/zdjizhi/source/DosSketchSource.java +++ b/src/main/java/com/zdjizhi/source/DosSketchSource.java @@ -13,7 +13,7 @@ public class DosSketchSource { private static StreamExecutionEnvironment streamExeEnv = FlinkEnvironmentUtils.streamExeEnv; - public static DataStreamSource createDosSketchSourceByDatastream(){ + public static DataStreamSource createDosSketchSource(){ Properties properties = new Properties(); properties.setProperty("bootstrap.servers", CommonConfig.KAFKA_INPUT_BOOTSTRAP_SERVERS); properties.setProperty("group.id", CommonConfig.KAFKA_GROUP_ID); diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties index 76a18ab..bfdd73a 100644 --- a/src/main/resources/common.properties +++ b/src/main/resources/common.properties @@ -14,6 +14,7 @@ kafka.output.event.parallelism=1 kafka.output.event.topic.name=DOS-EVENT-LOG kafka.output.bootstrap.servers=192.168.44.12:9092 +hbase.input.parallelism=1 hbase.zookeeper.quorum=192.168.44.12:2181 hbase.client.operation.timeout=30000 hbase.client.scanner.timeout.period=30000 @@ -21,6 +22,8 @@ hbase.client.scanner.timeout.period=30000 hbase.baseline.table.name=ddos_traffic_baselines hbase.baseline.total.num=1000000 +flink.first.agg.parallelism=1 +flink.second.agg.parallelism=1 flink.watermark.max.orderness=1 flink.window.max.time=10