datasketch部分代码采用reduce方式

This commit is contained in:
fengyi
2023-03-13 10:25:06 +08:00
parent 455f390387
commit f9c33dd93c
7 changed files with 873 additions and 538 deletions

View File

@@ -228,8 +228,8 @@ public class Toptask {
case 2:
//datasketch
//clientip聚合TOP
SingleOutputStreamOperator<Entity> clientipdStream2 = inputForSession.filter(new FilterFunction<Entity>() {
@Override
public boolean filter(Entity value) throws Exception {
@@ -237,24 +237,12 @@ public class Toptask {
}
}).assignTimestampsAndWatermarks(strategyForSession);
clientipdStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.aggregate(new DatasketchForMetricsAggregate("oneSession"), new UserCountWindowResult5())
// .print()
.addSink(getKafkaSink("TOP-CLIENT-IP")).setParallelism(3);
clientipdStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.aggregate(new DatasketchForMetricsAggregate("onePkt"), new UserCountWindowResult5())
// .print()
.addSink(getKafkaSink("TOP-CLIENT-IP")).setParallelism(3);
clientipdStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.aggregate(new DatasketchForMetricsAggregate("oneByte"), new UserCountWindowResult5())
// .print()
.addSink(getKafkaSink("TOP-CLIENT-IP")).setParallelism(3);
SingleOutputStreamOperator<ResultEntity> windowedStream2 = clientipdStream2.keyBy(new groupBySelector("common_client_ip"))
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.reduce(new metricsAggregationReduce(), new DatasketchMetricsCalculate(TOP_LIMIT, "common_client_ip")).setParallelism(TASK_PARALLELISM);
DataStream<String> windoweddStream2 = windowedStream2.keyBy(new oneKeySelector())
.process(new TopNHotItems(TOP_LIMIT)).setParallelism(3);
windoweddStream2.addSink(getKafkaSink("TOP-CLIENT-IP")).setParallelism(3);
//serverip聚合TOP
@@ -265,21 +253,12 @@ public class Toptask {
}
}).assignTimestampsAndWatermarks(strategyForSession);
serveripdStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.aggregate(new DatasketchForMetricsAggregate("twoSession"), new UserCountWindowResult5())
// .print()
.addSink(getKafkaSink("TOP-SERVER-IP")).setParallelism(3);
serveripdStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.aggregate(new DatasketchForMetricsAggregate("twoPkt"), new UserCountWindowResult5())
// .print()
.addSink(getKafkaSink("TOP-SERVER-IP")).setParallelism(3);
serveripdStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.aggregate(new DatasketchForMetricsAggregate("twoByte"), new UserCountWindowResult5())
// .print()
.addSink(getKafkaSink("TOP-SERVER-IP")).setParallelism(3);
SingleOutputStreamOperator<ResultEntity> windowedStreamForServerIp2 = serveripdStream2.keyBy(new groupBySelector("common_server_ip"))
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.reduce(new metricsAggregationReduce(), new DatasketchMetricsCalculate(TOP_LIMIT, "common_server_ip")).setParallelism(TASK_PARALLELISM);
DataStream<String> windoweddStreamForServerIp2 = windowedStreamForServerIp2.keyBy(new oneKeySelector())
.process(new TopNHotItems(TOP_LIMIT)).setParallelism(3);
windoweddStreamForServerIp2.addSink(getKafkaSink("TOP-SERVER-IP")).setParallelism(3);
//common_internal_ip聚合TOP
@@ -290,22 +269,12 @@ public class Toptask {
}
}).assignTimestampsAndWatermarks(strategyForSession);
internalStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.aggregate(new DatasketchForMetricsAggregate("threeSession"), new UserCountWindowResult5())
// .print()
.addSink(getKafkaSink("TOP-INTERNAL-HOST")).setParallelism(3);
internalStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.aggregate(new DatasketchForMetricsAggregate("threePkt"), new UserCountWindowResult5())
// .print()
.addSink(getKafkaSink("TOP-INTERNAL-HOST")).setParallelism(3);
internalStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.aggregate(new DatasketchForMetricsAggregate("threeByte"), new UserCountWindowResult5())
// .print()
.addSink(getKafkaSink("TOP-INTERNAL-HOST")).setParallelism(3);
SingleOutputStreamOperator<ResultEntity> windowedStreamForInternal2 = internalStream2.keyBy(new groupBySelector("common_internal_ip"))
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.reduce(new metricsAggregationReduce(), new DatasketchMetricsCalculate(TOP_LIMIT, "common_internal_ip")).setParallelism(TASK_PARALLELISM);
DataStream<String> WindoweddStreamForInternal2 = windowedStreamForInternal2.keyBy(new oneKeySelector())
.process(new TopNHotItems(TOP_LIMIT)).setParallelism(3);
WindoweddStreamForInternal2.addSink(getKafkaSink("TOP-INTERNAL-HOST")).setParallelism(3);
//common_external_ip聚合TOP
@@ -316,22 +285,12 @@ public class Toptask {
}
}).assignTimestampsAndWatermarks(strategyForSession);
externalStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.aggregate(new DatasketchForMetricsAggregate("fourSession"), new UserCountWindowResult5())
// .print()
.addSink(getKafkaSink("TOP-EXTERNAL-HOST")).setParallelism(3);
externalStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.aggregate(new DatasketchForMetricsAggregate("fourPkt"), new UserCountWindowResult5())
// .print()
.addSink(getKafkaSink("TOP-EXTERNAL-HOST")).setParallelism(3);
externalStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.aggregate(new DatasketchForMetricsAggregate("fourByte"), new UserCountWindowResult5())
// .print()
.addSink(getKafkaSink("TOP-EXTERNAL-HOST")).setParallelism(3);
SingleOutputStreamOperator<ResultEntity> windowedStreamForExternal2 = externalStream2.keyBy(new groupBySelector("common_external_ip"))
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.reduce(new metricsAggregationReduce(), new DatasketchMetricsCalculate(TOP_LIMIT, "common_external_ip")).setParallelism(TASK_PARALLELISM);
DataStream<String> WindoweddStreamForExternal2 = windowedStreamForExternal2.keyBy(new oneKeySelector())
.process(new TopNHotItems(TOP_LIMIT)).setParallelism(3);
WindoweddStreamForExternal2.addSink(getKafkaSink("TOP-EXTERNAL-HOST")).setParallelism(3);
//http_domain聚合TOP
@@ -342,24 +301,13 @@ public class Toptask {
}
}).assignTimestampsAndWatermarks(strategyForSession);
SingleOutputStreamOperator<ResultEntity> windowedStreamForDomain2 = domainStream2.keyBy(new groupBySelector("http_domain"))
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.reduce(new metricsAggregationReduce(), new DatasketchMetricsCalculate(TOP_LIMIT, "http_domain")).setParallelism(TASK_PARALLELISM);
DataStream<String> WindoweddStreamForDomain2 = windowedStreamForDomain2.keyBy(new oneKeySelector())
.process(new TopNHotItems(TOP_LIMIT)).setParallelism(3);
WindoweddStreamForDomain2.addSink(getKafkaSink("TOP-WEBSITE-DOMAIN")).setParallelism(3);
domainStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.aggregate(new DatasketchForMetricsAggregate("fiveSession"), new UserCountWindowResult5())
// .print()
.addSink(getKafkaSink("TOP-WEBSITE-DOMAIN")).setParallelism(3);
domainStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.aggregate(new DatasketchForMetricsAggregate("fivePkt"), new UserCountWindowResult5())
// .print()
.addSink(getKafkaSink("TOP-WEBSITE-DOMAIN")).setParallelism(3);
domainStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.aggregate(new DatasketchForMetricsAggregate("fiveByte"), new UserCountWindowResult5())
// .print()
.addSink(getKafkaSink("TOP-WEBSITE-DOMAIN")).setParallelism(3);
//common_subscriber_id聚合TOP
SingleOutputStreamOperator<Entity> userStream2 = inputForSession.filter(new FilterFunction<Entity>() {
@Override
public boolean filter(Entity value) throws Exception {
@@ -367,20 +315,13 @@ public class Toptask {
}
}).assignTimestampsAndWatermarks(strategyForSession);
userStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.aggregate(new DatasketchForMetricsAggregate("sixSession"), new UserCountWindowResult5())
// .print()
.addSink(getKafkaSink("TOP-USER")).setParallelism(3);
userStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.aggregate(new DatasketchForMetricsAggregate("sixPkt"), new UserCountWindowResult5())
// .print()
.addSink(getKafkaSink("TOP-USER")).setParallelism(3);
userStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.aggregate(new DatasketchForMetricsAggregate("sixByte"), new UserCountWindowResult5())
// .print()
.addSink(getKafkaSink("TOP-USER")).setParallelism(3);
//common_subscriber_id聚合TOP
SingleOutputStreamOperator<ResultEntity> windowedStreamForUser2 = userStream2.keyBy(new groupBySelector("common_subscriber_id"))
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.reduce(new metricsAggregationReduce(), new DatasketchMetricsCalculate(TOP_LIMIT, "common_subscriber_id")).setParallelism(TASK_PARALLELISM);
DataStream<String> WindoweddStreamForUser2 = windowedStreamForUser2.keyBy(new oneKeySelector())
.process(new TopNHotItems(TOP_LIMIT)).setParallelism(3);
WindoweddStreamForUser2.addSink(getKafkaSink("TOP-USER")).setParallelism(3);
//common_app_label聚合求全量
@@ -392,26 +333,14 @@ public class Toptask {
}).assignTimestampsAndWatermarks(strategyForSession);
appNameStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.aggregate(new DatasketchForMetricsAggregate("sevenSession"), new UserCountWindowResult5())
// .print()
.addSink(getKafkaSink("TRAFFIC-APP-STAT")).setParallelism(TASK_PARALLELISM);
appNameStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.aggregate(new DatasketchForMetricsAggregate("sevenPkt"), new UserCountWindowResult5())
// .print()
.addSink(getKafkaSink("TRAFFIC-APP-STAT")).setParallelism(TASK_PARALLELISM);
appNameStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.aggregate(new DatasketchForMetricsAggregate("sevenByte"), new UserCountWindowResult5())
// .print()
.addSink(getKafkaSink("TRAFFIC-APP-STAT")).setParallelism(TASK_PARALLELISM);
appNameStream2.keyBy(new groupBySelector("common_app_label"))
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.reduce(new metricsAggregationReduce(), new metricsCalculateForApp()).addSink(getKafkaSink("TRAFFIC-APP-STAT")).setParallelism(TASK_PARALLELISM);
//Security_record top 1000 1个窗口、proxy_record top 1000 1个窗口
//url聚合session求top
SingleOutputStreamOperator<UrlEntity> UrlStream2 = inputForUrl.filter(new FilterFunction<UrlEntity>() {
@Override
public boolean filter(UrlEntity value) throws Exception {
@@ -420,29 +349,12 @@ public class Toptask {
}).assignTimestampsAndWatermarks(strategyForSecurity);
UrlStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.aggregate(new DatasketchForUrlAggregate(), new UserCountWindowResult5())
// .print()
.addSink(getKafkaSink("TOP-URLS")).setParallelism(3);
//clientip聚合TOP
// SingleOutputStreamOperator<Entity> clientipdStream3 = inputForSession.filter(new FilterFunction<Entity>() {
// @Override
// public boolean filter(Entity value) throws Exception {
// return "IPv6_TCP".equals(value.getCommon_l4_protocol()) || "IPv4_TCP".equals(value.getCommon_l4_protocol());
// }
// }).assignTimestampsAndWatermarks(strategyForSession);
//
// SingleOutputStreamOperator<ResultEntity> windowedStream3 = clientipdStream3.keyBy(new groupBySelector("common_client_ip"))
// .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
// .reduce(new metricsAggregationReduce(), new metricsCalculate(TOP_LIMIT, "common_client_ip")).setParallelism(TASK_PARALLELISM);
// DataStream<String> windoweddStream3 = windowedStream3.keyBy(new oneKeySelector())
// .process(new TopNHotItems(TOP_LIMIT)).setParallelism(3);
// windoweddStream3.print();
SingleOutputStreamOperator<ResultEntity> windowedStreamForUrl2 = UrlStream2.keyBy(new twoKeySelector())
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.reduce(new UrlAggregationReduce(), new DatasketchUrlCalculate(URL_TOP_LIMIT)).setParallelism(TASK_PARALLELISM);
DataStream<String> WindoweddStreamForUrl2 = windowedStreamForUrl2.keyBy(new oneKeySelector())
.process(new TopNHotItemsForUrl(URL_TOP_LIMIT)).setParallelism(1);
WindoweddStreamForUrl2.addSink(getKafkaSink("TOP-URLS")).setParallelism(3);