初步增加datasketch方法处理topn

This commit is contained in:
fengyi
2023-03-08 14:47:09 +08:00
parent 4965ac0231
commit dc1f5a8af5
8 changed files with 582 additions and 2 deletions

View File

@@ -13,11 +13,13 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -225,6 +227,62 @@ public class Toptask {
break;
case 2:
//datasketch
//Session_record top1000 21个窗口一并计算
SingleOutputStreamOperator<Entity> clientipdStream2 = inputForSession.filter(new FilterFunction<Entity>() {
@Override
public boolean filter(Entity value) throws Exception {
return "IPv6_TCP".equals(value.getCommon_l4_protocol()) || "IPv4_TCP".equals(value.getCommon_l4_protocol());
}
}).assignTimestampsAndWatermarks(strategyForSession);
AllWindowedStream<Entity, TimeWindow> entityTimeWindowAllWindowedStream = clientipdStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)));
SingleOutputStreamOperator<String> aggregate = entityTimeWindowAllWindowedStream.aggregate(new UserHashMapCountAgg5(), new UserCountWindowResult5());
aggregate.print();
//Security_record top 1000 1个窗口、proxy_record top 1000 1个窗口
SingleOutputStreamOperator<UrlEntity> UrlStream2 = inputForUrl.filter(new FilterFunction<UrlEntity>() {
@Override
public boolean filter(UrlEntity value) throws Exception {
return StringUtil.isNotEmpty(value.getHttp_url());
}
}).assignTimestampsAndWatermarks(strategyForSecurity);
AllWindowedStream<UrlEntity, TimeWindow> urlEntityTimeWindowAllWindowedStream = UrlStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)));
SingleOutputStreamOperator<String> aggregate1 = urlEntityTimeWindowAllWindowedStream.aggregate(new UserHashMapCountAgg6(), new UserCountWindowResult5());
aggregate1.print();
//clientip聚合TOP
// SingleOutputStreamOperator<Entity> clientipdStream3 = inputForSession.filter(new FilterFunction<Entity>() {
// @Override
// public boolean filter(Entity value) throws Exception {
// return "IPv6_TCP".equals(value.getCommon_l4_protocol()) || "IPv4_TCP".equals(value.getCommon_l4_protocol());
// }
// }).assignTimestampsAndWatermarks(strategyForSession);
//
// SingleOutputStreamOperator<ResultEntity> windowedStream3 = clientipdStream3.keyBy(new groupBySelector("common_client_ip"))
// .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
// .reduce(new metricsAggregationReduce(), new metricsCalculate(TOP_LIMIT, "common_client_ip")).setParallelism(TASK_PARALLELISM);
// DataStream<String> windoweddStream3 = windowedStream3.keyBy(new oneKeySelector())
// .process(new TopNHotItems(TOP_LIMIT)).setParallelism(3);
// windoweddStream3.print();
break;
}