添加 flink各算子并行度参数

This commit is contained in:
wanglihui
2021-08-02 18:11:29 +08:00
parent 830b0fbd2f
commit 504ee64fa7
5 changed files with 14 additions and 7 deletions

View File

@@ -22,6 +22,7 @@ public class CommonConfig {
public static final String KAFKA_OUTPUT_EVENT_TOPIC_NAME = CommonConfigurations.getStringProperty("kafka.output.event.topic.name"); public static final String KAFKA_OUTPUT_EVENT_TOPIC_NAME = CommonConfigurations.getStringProperty("kafka.output.event.topic.name");
public static final String KAFKA_OUTPUT_BOOTSTRAP_SERVERS = CommonConfigurations.getStringProperty("kafka.output.bootstrap.servers"); public static final String KAFKA_OUTPUT_BOOTSTRAP_SERVERS = CommonConfigurations.getStringProperty("kafka.output.bootstrap.servers");
public static final int HBASE_INPUT_PARALLELISM = CommonConfigurations.getIntProperty("hbase.input.parallelism");
public static final String HBASE_ZOOKEEPER_QUORUM = CommonConfigurations.getStringProperty("hbase.zookeeper.quorum"); public static final String HBASE_ZOOKEEPER_QUORUM = CommonConfigurations.getStringProperty("hbase.zookeeper.quorum");
public static final int HBASE_CLIENT_OPERATION_TIMEOUT = CommonConfigurations.getIntProperty("hbase.client.operation.timeout"); public static final int HBASE_CLIENT_OPERATION_TIMEOUT = CommonConfigurations.getIntProperty("hbase.client.operation.timeout");
public static final int HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD = CommonConfigurations.getIntProperty("hbase.client.scanner.timeout.period"); public static final int HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD = CommonConfigurations.getIntProperty("hbase.client.scanner.timeout.period");
@@ -29,6 +30,8 @@ public class CommonConfig {
public static final String HBASE_BASELINE_TABLE_NAME = CommonConfigurations.getStringProperty("hbase.baseline.table.name"); public static final String HBASE_BASELINE_TABLE_NAME = CommonConfigurations.getStringProperty("hbase.baseline.table.name");
public static final int HBASE_BASELINE_TOTAL_NUM = CommonConfigurations.getIntProperty("hbase.baseline.total.num"); public static final int HBASE_BASELINE_TOTAL_NUM = CommonConfigurations.getIntProperty("hbase.baseline.total.num");
public static final int FLINK_FIRST_AGG_PARALLELISM = CommonConfigurations.getIntProperty("flink.first.agg.parallelism");
public static final int FLINK_SECOND_AGG_PARALLELISM = CommonConfigurations.getIntProperty("flink.second.agg.parallelism");
public static final int FLINK_WATERMARK_MAX_ORDERNESS = CommonConfigurations.getIntProperty("flink.watermark.max.orderness"); public static final int FLINK_WATERMARK_MAX_ORDERNESS = CommonConfigurations.getIntProperty("flink.watermark.max.orderness");
public static final int FLINK_WINDOW_MAX_TIME = CommonConfigurations.getIntProperty("flink.window.max.time"); public static final int FLINK_WINDOW_MAX_TIME = CommonConfigurations.getIntProperty("flink.window.max.time");

View File

@@ -22,13 +22,11 @@ public class ParseSketchLog {
private static Logger logger = LoggerFactory.getLogger(ParseSketchLog.class); private static Logger logger = LoggerFactory.getLogger(ParseSketchLog.class);
public static SingleOutputStreamOperator<DosSketchLog> getSketchSource(){ public static SingleOutputStreamOperator<DosSketchLog> getSketchSource(){
return flatSketchSource(); return flatSketchSource().assignTimestampsAndWatermarks(createWatermarkStrategy());
} }
private static SingleOutputStreamOperator<DosSketchLog> flatSketchSource(){ private static SingleOutputStreamOperator<DosSketchLog> flatSketchSource(){
return DosSketchSource.createDosSketchSourceByDatastream() return DosSketchSource.createDosSketchSource().flatMap(new flatSketchLog());
.flatMap(new flatSketchLog())
.assignTimestampsAndWatermarks(createWatermarkStrategy());
} }
private static WatermarkStrategy<DosSketchLog> createWatermarkStrategy(){ private static WatermarkStrategy<DosSketchLog> createWatermarkStrategy(){

View File

@@ -63,6 +63,7 @@ public class OutputStreamSink {
BroadcastStream<Map<String, Map<String,List<Integer>>>> broadcast = FlinkEnvironmentUtils.streamExeEnv BroadcastStream<Map<String, Map<String,List<Integer>>>> broadcast = FlinkEnvironmentUtils.streamExeEnv
.addSource(new BaselineSource()) .addSource(new BaselineSource())
.setParallelism(CommonConfig.HBASE_INPUT_PARALLELISM)
.broadcast(descriptor); .broadcast(descriptor);
logger.info("广播变量加载成功!!"); logger.info("广播变量加载成功!!");
@@ -70,14 +71,16 @@ public class OutputStreamSink {
// .window(TumblingEventTimeWindows.of(Time.seconds(CommonConfig.FLINK_WINDOW_MAX_TIME))) // .window(TumblingEventTimeWindows.of(Time.seconds(CommonConfig.FLINK_WINDOW_MAX_TIME)))
.reduce(new SecondReduceFunc()) .reduce(new SecondReduceFunc())
.connect(broadcast) .connect(broadcast)
.process(new DosDetection()); .process(new DosDetection())
.setParallelism(CommonConfig.FLINK_SECOND_AGG_PARALLELISM);
} }
private static SingleOutputStreamOperator<DosSketchLog> getMiddleStream(){ private static SingleOutputStreamOperator<DosSketchLog> getMiddleStream(){
return ParseSketchLog.getSketchSource() return ParseSketchLog.getSketchSource()
.keyBy(new FirstKeySelector()) .keyBy(new FirstKeySelector())
.window(TumblingEventTimeWindows.of(Time.seconds(CommonConfig.FLINK_WINDOW_MAX_TIME))) .window(TumblingEventTimeWindows.of(Time.seconds(CommonConfig.FLINK_WINDOW_MAX_TIME)))
.process(new EtlProcessFunction()); .process(new EtlProcessFunction())
.setParallelism(CommonConfig.FLINK_FIRST_AGG_PARALLELISM);
} }
private static String groupUniqSourceIp(String sourceIp1,String sourceIp2){ private static String groupUniqSourceIp(String sourceIp1,String sourceIp2){

View File

@@ -13,7 +13,7 @@ public class DosSketchSource {
private static StreamExecutionEnvironment streamExeEnv = FlinkEnvironmentUtils.streamExeEnv; private static StreamExecutionEnvironment streamExeEnv = FlinkEnvironmentUtils.streamExeEnv;
public static DataStreamSource<String> createDosSketchSourceByDatastream(){ public static DataStreamSource<String> createDosSketchSource(){
Properties properties = new Properties(); Properties properties = new Properties();
properties.setProperty("bootstrap.servers", CommonConfig.KAFKA_INPUT_BOOTSTRAP_SERVERS); properties.setProperty("bootstrap.servers", CommonConfig.KAFKA_INPUT_BOOTSTRAP_SERVERS);
properties.setProperty("group.id", CommonConfig.KAFKA_GROUP_ID); properties.setProperty("group.id", CommonConfig.KAFKA_GROUP_ID);

View File

@@ -14,6 +14,7 @@ kafka.output.event.parallelism=1
kafka.output.event.topic.name=DOS-EVENT-LOG kafka.output.event.topic.name=DOS-EVENT-LOG
kafka.output.bootstrap.servers=192.168.44.12:9092 kafka.output.bootstrap.servers=192.168.44.12:9092
hbase.input.parallelism=1
hbase.zookeeper.quorum=192.168.44.12:2181 hbase.zookeeper.quorum=192.168.44.12:2181
hbase.client.operation.timeout=30000 hbase.client.operation.timeout=30000
hbase.client.scanner.timeout.period=30000 hbase.client.scanner.timeout.period=30000
@@ -21,6 +22,8 @@ hbase.client.scanner.timeout.period=30000
hbase.baseline.table.name=ddos_traffic_baselines hbase.baseline.table.name=ddos_traffic_baselines
hbase.baseline.total.num=1000000 hbase.baseline.total.num=1000000
flink.first.agg.parallelism=1
flink.second.agg.parallelism=1
flink.watermark.max.orderness=1 flink.watermark.max.orderness=1
flink.window.max.time=10 flink.window.max.time=10