优化算法3

This commit is contained in:
kingwide
2023-03-16 10:27:03 +08:00
parent b0ac752af6
commit d7012eb805
4 changed files with 149 additions and 3 deletions

View File

@@ -13,13 +13,11 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -116,6 +114,9 @@ public class Toptask {
.process(new TopNHotItems(TOP_LIMIT)).setParallelism(3);
windoweddStream.addSink(getKafkaSink("TOP-CLIENT-IP")).setParallelism(3);
//serverip聚合TOP
SingleOutputStreamOperator<Entity> serveripdStream = inputForSession.filter(new FilterFunction<Entity>() {
@@ -384,7 +385,131 @@ public class Toptask {
break;
case 3:
SingleOutputStreamOperator<Entity> clientipdStream3 = inputForSession.filter(new FilterFunction<Entity>() {
@Override
public boolean filter(Entity value) throws Exception {
return "IPv6_TCP".equals(value.getCommon_l4_protocol()) || "IPv4_TCP".equals(value.getCommon_l4_protocol());
}
}).assignTimestampsAndWatermarks(strategyForSession);
SingleOutputStreamOperator<ResultEntity> windowedStream3 = clientipdStream3.keyBy(new groupBySelector("common_client_ip"))
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.aggregate(new metricsAggregation(TOP_LIMIT), new metricsCalculateTest(TOP_LIMIT, "common_client_ip")).setParallelism(TASK_PARALLELISM);
DataStream<String> windoweddStream3 = windowedStream3.keyBy(new oneKeySelector())
.process(new TopNHotItems(TOP_LIMIT)).setParallelism(1);
windoweddStream3.addSink(getKafkaSink("TOP-CLIENT-IP")).setParallelism(1);
SingleOutputStreamOperator<Entity> serveripdStream3 = inputForSession.filter(new FilterFunction<Entity>() {
@Override
public boolean filter(Entity value) throws Exception {
return "IPv6_TCP".equals(value.getCommon_l4_protocol()) || "IPv4_TCP".equals(value.getCommon_l4_protocol());
}
}).assignTimestampsAndWatermarks(strategyForSession);
SingleOutputStreamOperator<ResultEntity> windowedStreamForServerIp3 = serveripdStream3.keyBy(new groupBySelector("common_server_ip"))
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.aggregate(new metricsAggregation(TOP_LIMIT), new metricsCalculateTest(TOP_LIMIT, "common_server_ip")).setParallelism(TASK_PARALLELISM);
DataStream<String> windoweddStreamForServerIp3 = windowedStreamForServerIp3.keyBy(new oneKeySelector())
.process(new TopNHotItems(TOP_LIMIT)).setParallelism(1);
windoweddStreamForServerIp3.addSink(getKafkaSink("TOP-SERVER-IP")).setParallelism(1);
//common_internal_ip聚合TOP
SingleOutputStreamOperator<Entity> internalStream3 = inputForSession.filter(new FilterFunction<Entity>() {
@Override
public boolean filter(Entity value) throws Exception {
return StringUtil.isNotEmpty(value.getCommon_internal_ip());
}
}).assignTimestampsAndWatermarks(strategyForSession);
SingleOutputStreamOperator<ResultEntity> windowedStreamForInternal3 = internalStream3.keyBy(new groupBySelector("common_internal_ip"))
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.aggregate(new metricsAggregation(TOP_LIMIT), new metricsCalculateTest(TOP_LIMIT, "common_internal_ip")).setParallelism(TASK_PARALLELISM);
DataStream<String> WindoweddStreamForInternal3 = windowedStreamForInternal3.keyBy(new oneKeySelector())
.process(new TopNHotItems(TOP_LIMIT)).setParallelism(1);
WindoweddStreamForInternal3.addSink(getKafkaSink("TOP-INTERNAL-HOST")).setParallelism(1);
//common_external_ip聚合TOP
SingleOutputStreamOperator<Entity> externalStream3 = inputForSession.filter(new FilterFunction<Entity>() {
@Override
public boolean filter(Entity value) throws Exception {
return StringUtil.isNotEmpty(value.getCommon_external_ip());
}
}).assignTimestampsAndWatermarks(strategyForSession);
SingleOutputStreamOperator<ResultEntity> windowedStreamForExternal3= externalStream3.keyBy(new groupBySelector("common_external_ip"))
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.aggregate(new metricsAggregation(TOP_LIMIT), new metricsCalculateTest(TOP_LIMIT, "common_external_ip")).setParallelism(TASK_PARALLELISM);
DataStream<String> WindoweddStreamForExternal3 = windowedStreamForExternal3.keyBy(new oneKeySelector())
.process(new TopNHotItems(TOP_LIMIT)).setParallelism(1);
WindoweddStreamForExternal3.addSink(getKafkaSink("TOP-EXTERNAL-HOST")).setParallelism(1);
//http_domain聚合TOP
SingleOutputStreamOperator<Entity> domainStream3 = inputForSession.filter(new FilterFunction<Entity>() {
@Override
public boolean filter(Entity value) throws Exception {
return StringUtil.isNotEmpty(value.getHttp_domain());
}
}).assignTimestampsAndWatermarks(strategyForSession);
SingleOutputStreamOperator<ResultEntity> windowedStreamForDomain3 = domainStream3.keyBy(new groupBySelector("http_domain"))
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.aggregate(new metricsAggregation(TOP_LIMIT), new metricsCalculateTest(TOP_LIMIT, "http_domain")).setParallelism(TASK_PARALLELISM);
DataStream<String> WindoweddStreamForDomain3 = windowedStreamForDomain3.keyBy(new oneKeySelector())
.process(new TopNHotItems(TOP_LIMIT)).setParallelism(1);
WindoweddStreamForDomain3.addSink(getKafkaSink("TOP-WEBSITE-DOMAIN")).setParallelism(1);
SingleOutputStreamOperator<Entity> userStream3 = inputForSession.filter(new FilterFunction<Entity>() {
@Override
public boolean filter(Entity value) throws Exception {
return StringUtil.isNotEmpty(value.getCommon_subscriber_id());
}
}).assignTimestampsAndWatermarks(strategyForSession);
//common_subscriber_id聚合TOP
SingleOutputStreamOperator<ResultEntity> windowedStreamForUser3 = userStream3.keyBy(new groupBySelector("common_subscriber_id"))
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.aggregate(new metricsAggregation(TOP_LIMIT), new metricsCalculateTest(TOP_LIMIT, "common_subscriber_id")).setParallelism(TASK_PARALLELISM);
DataStream<String> WindoweddStreamForUser3 = windowedStreamForUser3.keyBy(new oneKeySelector())
.process(new TopNHotItems(TOP_LIMIT)).setParallelism(1);
WindoweddStreamForUser3.addSink(getKafkaSink("TOP-USER")).setParallelism(1);
SingleOutputStreamOperator<Entity> appNameStream3 = inputForSession.filter(new FilterFunction<Entity>() {
@Override
public boolean filter(Entity value) throws Exception {
return StringUtil.isNotEmpty(value.getCommon_app_label());
}
}).assignTimestampsAndWatermarks(strategyForSession);
//common_app_label聚合求全量
appNameStream3.keyBy(new groupBySelector("common_app_label"))
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.reduce(new metricsAggregationReduce(), new metricsCalculateForApp()).addSink(getKafkaSink("TRAFFIC-APP-STAT")).setParallelism(TASK_PARALLELISM);
SingleOutputStreamOperator<UrlEntity> UrlStream3 = inputForUrl.filter(new FilterFunction<UrlEntity>() {
@Override
public boolean filter(UrlEntity value) throws Exception {
return StringUtil.isNotEmpty(value.getHttp_url());
}
}).assignTimestampsAndWatermarks(strategyForSecurity);
//url聚合session求top
SingleOutputStreamOperator<ResultEntity> windowedStreamForUrl3 = UrlStream3.keyBy(new twoKeySelector())
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.reduce(new UrlAggregationReduce(), new metricsCalculateForUrl(URL_TOP_LIMIT)).setParallelism(TASK_PARALLELISM);
DataStream<String> WindoweddStreamForUrl3 = windowedStreamForUrl3.keyBy(new oneKeySelector())
.process(new TopNHotItemsForUrl(URL_TOP_LIMIT)).setParallelism(1);
WindoweddStreamForUrl3.addSink(getKafkaSink("TOP-URLS")).setParallelism(1);
break;