diff --git a/pom.xml b/pom.xml index 7625ada..add5055 100644 --- a/pom.xml +++ b/pom.xml @@ -179,7 +179,11 @@ provided - + + org.apache.datasketches + datasketches-java + 3.3.0 + diff --git a/src/main/java/com/galaxy/tsg/Toptask.java b/src/main/java/com/galaxy/tsg/Toptask.java index f0030bf..e6d6d0f 100644 --- a/src/main/java/com/galaxy/tsg/Toptask.java +++ b/src/main/java/com/galaxy/tsg/Toptask.java @@ -13,11 +13,13 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.streaming.api.datastream.AllWindowedStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -225,6 +227,62 @@ public class Toptask { break; case 2: //datasketch + + + //Session_record top1000 21个窗口一并计算 + SingleOutputStreamOperator clientipdStream2 = inputForSession.filter(new FilterFunction() { + @Override + public boolean filter(Entity value) throws Exception { + return "IPv6_TCP".equals(value.getCommon_l4_protocol()) || "IPv4_TCP".equals(value.getCommon_l4_protocol()); + } + }).assignTimestampsAndWatermarks(strategyForSession); + + + AllWindowedStream entityTimeWindowAllWindowedStream = clientipdStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))); + SingleOutputStreamOperator aggregate = entityTimeWindowAllWindowedStream.aggregate(new UserHashMapCountAgg5(), new UserCountWindowResult5()); + aggregate.print(); + + + + + + + //Security_record top 1000 1个窗口、proxy_record top 1000 1个窗口 + SingleOutputStreamOperator UrlStream2 = inputForUrl.filter(new FilterFunction() { + @Override + public boolean filter(UrlEntity value) throws Exception { + return StringUtil.isNotEmpty(value.getHttp_url()); + } + }).assignTimestampsAndWatermarks(strategyForSecurity); + + + AllWindowedStream urlEntityTimeWindowAllWindowedStream = UrlStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))); + SingleOutputStreamOperator aggregate1 = urlEntityTimeWindowAllWindowedStream.aggregate(new UserHashMapCountAgg6(), new UserCountWindowResult5()); + aggregate1.print(); + + + + //clientip聚合TOP + +// SingleOutputStreamOperator clientipdStream3 = inputForSession.filter(new FilterFunction() { +// @Override +// public boolean filter(Entity value) throws Exception { +// return "IPv6_TCP".equals(value.getCommon_l4_protocol()) || "IPv4_TCP".equals(value.getCommon_l4_protocol()); +// } +// }).assignTimestampsAndWatermarks(strategyForSession); +// +// SingleOutputStreamOperator windowedStream3 = clientipdStream3.keyBy(new groupBySelector("common_client_ip")) +// .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) +// .reduce(new metricsAggregationReduce(), new metricsCalculate(TOP_LIMIT, "common_client_ip")).setParallelism(TASK_PARALLELISM); +// DataStream windoweddStream3 = windowedStream3.keyBy(new oneKeySelector()) +// .process(new TopNHotItems(TOP_LIMIT)).setParallelism(3); +// windoweddStream3.print(); + + + + + + break; } diff --git a/src/main/java/com/galaxy/tsg/function/Dimension.java b/src/main/java/com/galaxy/tsg/function/Dimension.java new file mode 100644 index 0000000..bc5a683 --- /dev/null +++ b/src/main/java/com/galaxy/tsg/function/Dimension.java @@ -0,0 +1,80 @@ +package com.galaxy.tsg.function; + + +import com.galaxy.tsg.pojo.Entity; +import com.galaxy.tsg.pojo.UrlEntity; + +/** + * @author fy + * @version 1.0 + * @date 2023/2/17 10:14 + * 各种维度 + */ +public class Dimension { + + public static final String ONE = "common_client_ip,common_vsys_id,common_device_group,common_data_center"; + public static final String TWO = "common_server_ip,common_vsys_id,common_device_group,common_data_center"; + public static final String THREE = "common_internal_ip,common_vsys_id,common_device_group,common_data_center"; + public static final String FOUR = "common_external_ip,common_vsys_id,common_device_group,common_data_center"; + public static final String FIVE = "http_domain,common_vsys_id,common_device_group,common_data_center"; + public static final String SIX = "common_subscriber_id,common_vsys_id,common_device_group,common_data_center"; + public static final String SEVEN = "common_app_label,common_vsys_id,common_device_group,common_data_center"; + + public static final String EIGHT = "http_url,common_vsys_id"; + + + + public static String setOneDimension(Entity cnRecordLog){ + + String oneDimension = cnRecordLog.getCommon_client_ip()+","+cnRecordLog.getCommon_vsys_id()+","+cnRecordLog.getCommon_device_group()+","+cnRecordLog.getCommon_data_center(); + return oneDimension; + } + + + public static String setTwoDimension(Entity cnRecordLog){ + + String twoDimension = cnRecordLog.getCommon_server_ip()+","+cnRecordLog.getCommon_vsys_id()+","+cnRecordLog.getCommon_device_group()+","+cnRecordLog.getCommon_data_center(); + return twoDimension; + } + + + public static String setThreeDimension(Entity cnRecordLog){ + + String threeDimension = cnRecordLog.getCommon_internal_ip()+","+cnRecordLog.getCommon_vsys_id()+","+cnRecordLog.getCommon_device_group()+","+cnRecordLog.getCommon_data_center(); + return threeDimension; + } + + + public static String setFourDimension(Entity cnRecordLog){ + + String fourDimension = cnRecordLog.getCommon_external_ip()+","+cnRecordLog.getCommon_vsys_id()+","+cnRecordLog.getCommon_device_group()+","+cnRecordLog.getCommon_data_center(); + return fourDimension; + } + + public static String setFiveDimension(Entity cnRecordLog){ + + String fiveDimension = cnRecordLog.getHttp_domain()+","+cnRecordLog.getCommon_vsys_id()+","+cnRecordLog.getCommon_device_group()+","+cnRecordLog.getCommon_data_center(); + return fiveDimension; + } + + + public static String setSixDimension(Entity cnRecordLog){ + + String sixDimension = cnRecordLog.getCommon_subscriber_id()+","+cnRecordLog.getCommon_vsys_id()+","+cnRecordLog.getCommon_device_group()+","+cnRecordLog.getCommon_data_center(); + return sixDimension; + } + + + public static String setSevenDimension(Entity cnRecordLog){ + + String sevenDimension = cnRecordLog.getCommon_app_label()+","+cnRecordLog.getCommon_vsys_id()+","+cnRecordLog.getCommon_device_group()+","+cnRecordLog.getCommon_data_center(); + return sevenDimension; + } + + public static String setEightDimension(UrlEntity cnRecordLog){ + + String eightDimension = cnRecordLog.getHttp_url()+","+cnRecordLog.getCommon_vsys_id(); + return eightDimension; + } + +} diff --git a/src/main/java/com/galaxy/tsg/function/DimensionItemsSketch.java b/src/main/java/com/galaxy/tsg/function/DimensionItemsSketch.java new file mode 100644 index 0000000..457cbc2 --- /dev/null +++ b/src/main/java/com/galaxy/tsg/function/DimensionItemsSketch.java @@ -0,0 +1,43 @@ +package com.galaxy.tsg.function; + + +import org.apache.datasketches.frequencies.ItemsSketch; + +/** + * @author fy + * @version 1.0 + * @date 2023/2/17 10:03 + * 维度+datasketches + */ +public class DimensionItemsSketch { + + private String dimension;//维度 + private ItemsSketch itemsSketch;//对应的 + + + public DimensionItemsSketch(String dimension, ItemsSketch itemsSketch) { + this.dimension = dimension; + this.itemsSketch = itemsSketch; + } + + + public String getDimension() { + return dimension; + } + + public void setDimension(String dimension) { + this.dimension = dimension; + } + + public ItemsSketch getItemsSketch() { + return itemsSketch; + } + + public void setItemsSketch(ItemsSketch itemsSketch) { + this.itemsSketch = itemsSketch; + } +} + + + + diff --git a/src/main/java/com/galaxy/tsg/function/UserCountWindowResult5.java b/src/main/java/com/galaxy/tsg/function/UserCountWindowResult5.java new file mode 100644 index 0000000..f7f9207 --- /dev/null +++ b/src/main/java/com/galaxy/tsg/function/UserCountWindowResult5.java @@ -0,0 +1,79 @@ +package com.galaxy.tsg.function; + +import org.apache.datasketches.frequencies.ErrorType; +import org.apache.datasketches.frequencies.ItemsSketch; +import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.Collector; + +import java.sql.Timestamp; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * @author fy + * @version 1.0 + * @date 2023/3/7 15:50 + */ +public class UserCountWindowResult5 extends ProcessAllWindowFunction, String, TimeWindow> { + + + @Override + public void process(Context context, Iterable> iterable, Collector collector) throws Exception { + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append("-------------------------\n"); + stringBuilder.append("datasketches方法窗口结束时间:" + new Timestamp(context.window().getEnd()) + "\n"); + + + HashMap dataHashMap = iterable.iterator().next(); + + + System.out.println(dataHashMap.toString()); + + Set> entries = dataHashMap.entrySet(); + + + for(Map.Entry entry:entries){ + + System.out.println(entry.getKey()+""); + stringBuilder.append(entry.getKey()+"\n"); + + ItemsSketch.Row[] items = entry.getValue().getItemsSketch().getFrequentItems(ErrorType.NO_FALSE_POSITIVES); + + for (int i=0;i, HashMap> { + + + @Override + public HashMap createAccumulator() { + return new HashMap(1048576); + } + + @Override + public HashMap add(Entity cnRecordLog, HashMap stringItemsSketchHashMap) { + +// String dimension = cnRecordLog.getCommon_client_ip();//维度 +// System.out.println(dimension); + + if(stringItemsSketchHashMap.isEmpty()) { + + ItemsSketch oneSessionItemsSketch = new ItemsSketch<>(1048576);//新建 + ItemsSketch twoSessionItemsSketch = new ItemsSketch<>(1048576);//新建 + ItemsSketch threeSessionItemsSketch = new ItemsSketch<>(1048576);//新建 + ItemsSketch fourSessionItemsSketch = new ItemsSketch<>(1048576);//新建 + ItemsSketch fiveSessionItemsSketch = new ItemsSketch<>(1048576);//新建 + ItemsSketch sixSessionItemsSketch = new ItemsSketch<>(1048576);//新建 + ItemsSketch sevenSessionItemsSketch = new ItemsSketch<>(1048576);//新建 + + ItemsSketch onePktItemsSketch = new ItemsSketch<>(1048576); + ItemsSketch twoPktItemsSketch = new ItemsSketch<>(1048576); + ItemsSketch threePktItemsSketch = new ItemsSketch<>(1048576); + ItemsSketch fourPktItemsSketch = new ItemsSketch<>(1048576); + ItemsSketch fivePktItemsSketch = new ItemsSketch<>(1048576); + ItemsSketch sixPktItemsSketch = new ItemsSketch<>(1048576); + ItemsSketch sevenPktItemsSketch = new ItemsSketch<>(1048576); + + + + ItemsSketch oneByteItemsSketch = new ItemsSketch<>(1048576); + ItemsSketch twoByteItemsSketch = new ItemsSketch<>(1048576); + ItemsSketch threeByteItemsSketch = new ItemsSketch<>(1048576); + ItemsSketch fourByteItemsSketch = new ItemsSketch<>(1048576); + ItemsSketch fiveByteItemsSketch = new ItemsSketch<>(1048576); + ItemsSketch sixByteItemsSketch = new ItemsSketch<>(1048576); + ItemsSketch sevenByteItemsSketch = new ItemsSketch<>(1048576); + + + oneSessionItemsSketch.update(Dimension.setOneDimension(cnRecordLog),cnRecordLog.getCommon_sessions()); + twoSessionItemsSketch.update(Dimension.setTwoDimension(cnRecordLog),cnRecordLog.getCommon_sessions()); + threeSessionItemsSketch.update(Dimension.setThreeDimension(cnRecordLog),cnRecordLog.getCommon_sessions()); + fourSessionItemsSketch.update(Dimension.setFourDimension(cnRecordLog),cnRecordLog.getCommon_sessions()); + fiveSessionItemsSketch.update(Dimension.setFiveDimension(cnRecordLog),cnRecordLog.getCommon_sessions()); + sixSessionItemsSketch.update(Dimension.setSixDimension(cnRecordLog),cnRecordLog.getCommon_sessions()); + sevenSessionItemsSketch.update(Dimension.setSevenDimension(cnRecordLog),cnRecordLog.getCommon_sessions()); + + onePktItemsSketch.update(Dimension.setOneDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num()); + twoPktItemsSketch.update(Dimension.setTwoDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num()); + threePktItemsSketch.update(Dimension.setThreeDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num()); + fourPktItemsSketch.update(Dimension.setFourDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num()); + fivePktItemsSketch.update(Dimension.setFiveDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num()); + sixPktItemsSketch.update(Dimension.setSixDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num()); + sevenPktItemsSketch.update(Dimension.setSevenDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num()); + + + oneByteItemsSketch.update(Dimension.setOneDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num()); + twoByteItemsSketch.update(Dimension.setTwoDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num()); + threeByteItemsSketch.update(Dimension.setThreeDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num()); + fourByteItemsSketch.update(Dimension.setFourDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num()); + fiveByteItemsSketch.update(Dimension.setFiveDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num()); + sixByteItemsSketch.update(Dimension.setSixDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num()); + sevenByteItemsSketch.update(Dimension.setSevenDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num()); + + + + DimensionItemsSketch oneSessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.ONE, oneSessionItemsSketch); + DimensionItemsSketch twoSessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.TWO,twoSessionItemsSketch); + DimensionItemsSketch threeSessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.THREE,threeSessionItemsSketch); + DimensionItemsSketch fourSessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.FOUR,fourSessionItemsSketch); + DimensionItemsSketch fiveSessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.FIVE,fiveSessionItemsSketch); + DimensionItemsSketch sixSessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.SIX,sixSessionItemsSketch); + DimensionItemsSketch sevenSessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.SEVEN,sevenSessionItemsSketch); + + DimensionItemsSketch onePktDimensionItemsSketch = new DimensionItemsSketch(Dimension.ONE,onePktItemsSketch); + DimensionItemsSketch twoPktDimensionItemsSketch = new DimensionItemsSketch(Dimension.TWO,twoPktItemsSketch); + DimensionItemsSketch threePktDimensionItemsSketch = new DimensionItemsSketch(Dimension.THREE,threePktItemsSketch); + DimensionItemsSketch fourPktDimensionItemsSketch = new DimensionItemsSketch(Dimension.FOUR,fourPktItemsSketch); + DimensionItemsSketch fivePktDimensionItemsSketch = new DimensionItemsSketch(Dimension.FIVE,fivePktItemsSketch); + DimensionItemsSketch sixPktDimensionItemsSketch = new DimensionItemsSketch(Dimension.SIX,sixPktItemsSketch); + DimensionItemsSketch sevenPktDimensionItemsSketch = new DimensionItemsSketch(Dimension.SEVEN,sevenPktItemsSketch); + + DimensionItemsSketch oneByteDimensionItemsSketch = new DimensionItemsSketch(Dimension.ONE,oneByteItemsSketch); + DimensionItemsSketch twoByteDimensionItemsSketch = new DimensionItemsSketch(Dimension.TWO,twoByteItemsSketch); + DimensionItemsSketch threeByteDimensionItemsSketch = new DimensionItemsSketch(Dimension.THREE,threeByteItemsSketch); + DimensionItemsSketch fourByteDimensionItemsSketch = new DimensionItemsSketch(Dimension.FOUR,fourByteItemsSketch); + DimensionItemsSketch fiveByteDimensionItemsSketch = new DimensionItemsSketch(Dimension.FIVE,fiveByteItemsSketch); + DimensionItemsSketch sixByteDimensionItemsSketch = new DimensionItemsSketch(Dimension.SIX,sixByteItemsSketch); + DimensionItemsSketch sevenByteDimensionItemsSketch = new DimensionItemsSketch(Dimension.SEVEN,sevenByteItemsSketch); + + + stringItemsSketchHashMap.put("oneSession", oneSessionDimensionItemsSketch); + stringItemsSketchHashMap.put("twoSession", twoSessionDimensionItemsSketch); + stringItemsSketchHashMap.put("threeSession", threeSessionDimensionItemsSketch); + stringItemsSketchHashMap.put("fourSession", fourSessionDimensionItemsSketch); + stringItemsSketchHashMap.put("fiveSession", fiveSessionDimensionItemsSketch); + stringItemsSketchHashMap.put("sixSession", sixSessionDimensionItemsSketch); + stringItemsSketchHashMap.put("sevenSession", sevenSessionDimensionItemsSketch); + + + stringItemsSketchHashMap.put("onePkt",onePktDimensionItemsSketch); + stringItemsSketchHashMap.put("twoPkt",twoPktDimensionItemsSketch); + stringItemsSketchHashMap.put("threePkt",threePktDimensionItemsSketch); + stringItemsSketchHashMap.put("fourPkt",fourPktDimensionItemsSketch); + stringItemsSketchHashMap.put("fivePkt",fivePktDimensionItemsSketch); + stringItemsSketchHashMap.put("sixPkt",sixPktDimensionItemsSketch); + stringItemsSketchHashMap.put("sevenPkt",sevenPktDimensionItemsSketch); + + + stringItemsSketchHashMap.put("oneByte",oneByteDimensionItemsSketch); + stringItemsSketchHashMap.put("twoByte",twoByteDimensionItemsSketch); + stringItemsSketchHashMap.put("threeByte",threeByteDimensionItemsSketch); + stringItemsSketchHashMap.put("fourByte",fourByteDimensionItemsSketch); + stringItemsSketchHashMap.put("fiveByte",fiveByteDimensionItemsSketch); + stringItemsSketchHashMap.put("sixByte",sixByteDimensionItemsSketch); + stringItemsSketchHashMap.put("sevenByte",sevenByteDimensionItemsSketch); + + + }else { + + stringItemsSketchHashMap.get("oneSession").getItemsSketch().update(Dimension.setOneDimension(cnRecordLog),cnRecordLog.getCommon_sessions()); + stringItemsSketchHashMap.get("twoSession").getItemsSketch().update(Dimension.setTwoDimension(cnRecordLog),cnRecordLog.getCommon_sessions()); + stringItemsSketchHashMap.get("threeSession").getItemsSketch().update(Dimension.setThreeDimension(cnRecordLog),cnRecordLog.getCommon_sessions()); + stringItemsSketchHashMap.get("fourSession").getItemsSketch().update(Dimension.setFourDimension(cnRecordLog),cnRecordLog.getCommon_sessions()); + stringItemsSketchHashMap.get("fiveSession").getItemsSketch().update(Dimension.setFiveDimension(cnRecordLog),cnRecordLog.getCommon_sessions()); + stringItemsSketchHashMap.get("sixSession").getItemsSketch().update(Dimension.setSixDimension(cnRecordLog),cnRecordLog.getCommon_sessions()); + stringItemsSketchHashMap.get("sevenSession").getItemsSketch().update(Dimension.setSevenDimension(cnRecordLog),cnRecordLog.getCommon_sessions()); + + stringItemsSketchHashMap.get("onePkt").getItemsSketch().update(Dimension.setOneDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num()); + stringItemsSketchHashMap.get("twoPkt").getItemsSketch().update(Dimension.setTwoDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num()); + stringItemsSketchHashMap.get("threePkt").getItemsSketch().update(Dimension.setThreeDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num()); + stringItemsSketchHashMap.get("fourPkt").getItemsSketch().update(Dimension.setFourDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num()); + stringItemsSketchHashMap.get("fivePkt").getItemsSketch().update(Dimension.setFiveDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num()); + stringItemsSketchHashMap.get("sixPkt").getItemsSketch().update(Dimension.setSixDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num()); + stringItemsSketchHashMap.get("sevenPkt").getItemsSketch().update(Dimension.setSevenDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num()); + + + stringItemsSketchHashMap.get("oneByte").getItemsSketch().update(Dimension.setOneDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num()); + stringItemsSketchHashMap.get("twoByte").getItemsSketch().update(Dimension.setTwoDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num()); + stringItemsSketchHashMap.get("threeByte").getItemsSketch().update(Dimension.setThreeDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num()); + stringItemsSketchHashMap.get("fourByte").getItemsSketch().update(Dimension.setFourDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num()); + stringItemsSketchHashMap.get("fiveByte").getItemsSketch().update(Dimension.setFiveDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num()); + stringItemsSketchHashMap.get("sixByte").getItemsSketch().update(Dimension.setSixDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num()); + stringItemsSketchHashMap.get("sevenByte").getItemsSketch().update(Dimension.setSevenDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num()); + + + + + + } + + + + return stringItemsSketchHashMap; + } + + @Override + public HashMap getResult(HashMap stringItemsSketchHashMap) { + + + return stringItemsSketchHashMap; + } + + @Override + public HashMap merge(HashMap stringItemsSketchHashMap, HashMap acc1) { + +// System.out.println("合并"); +// HashMap> unionSketchHashMap = new HashMap<>(); +// ItemsSketch sessionItemsSketch = new ItemsSketch<>(1048576); +// ItemsSketch pktItemsSketch = new ItemsSketch<>(1048576); +// ItemsSketch byteItemsSketch = new ItemsSketch<>(1048576); + +// ItemsSketch session_stringItemsSketch = stringItemsSketchHashMap.get("session"); +// ItemsSketch pkt_stringItemsSketch = stringItemsSketchHashMap.get("pkt"); +// ItemsSketch byte_stringItemsSketch = stringItemsSketchHashMap.get("byte"); + +// ItemsSketch session_acc1 = acc1.get("session"); +// ItemsSketch pkt_acc1 = acc1.get("pkt"); +// ItemsSketch byte_acc1 = acc1.get("byte"); + +// sessionItemsSketch.merge(session_stringItemsSketch); +// sessionItemsSketch.merge(session_acc1); + +// pktItemsSketch.merge(pkt_stringItemsSketch); +// pktItemsSketch.merge(pkt_acc1); +// +// byteItemsSketch.merge(byte_stringItemsSketch); +// byteItemsSketch.merge(byte_acc1); + +// unionSketchHashMap.put("session",sessionItemsSketch); +// unionSketchHashMap.put("pkt",pktItemsSketch); +// unionSketchHashMap.put("byte",byteItemsSketch); + + + return null; + } +} diff --git a/src/main/java/com/galaxy/tsg/function/UserHashMapCountAgg6.java b/src/main/java/com/galaxy/tsg/function/UserHashMapCountAgg6.java new file mode 100644 index 0000000..2005e1f --- /dev/null +++ b/src/main/java/com/galaxy/tsg/function/UserHashMapCountAgg6.java @@ -0,0 +1,99 @@ +package com.galaxy.tsg.function; + + +import com.galaxy.tsg.pojo.Entity; +import com.galaxy.tsg.pojo.UrlEntity; +import org.apache.datasketches.frequencies.ItemsSketch; +import org.apache.flink.api.common.functions.AggregateFunction; + +import java.util.HashMap; + +/** + * @author fy + * @version 1.0 + * @date 2023/2/14 17:29 + * + *Session_record top10000 21个窗口计算 + */ +public class UserHashMapCountAgg6 implements AggregateFunction, HashMap> { + + + @Override + public HashMap createAccumulator() { + return new HashMap(1048576); + } + + @Override + public HashMap add(UrlEntity cnRecordLog, HashMap stringItemsSketchHashMap) { + +// String dimension = cnRecordLog.getCommon_client_ip();//维度 +// System.out.println(dimension); + + if(stringItemsSketchHashMap.isEmpty()) { + + ItemsSketch eightSessionItemsSketch = new ItemsSketch<>(1048576);//新建 + + eightSessionItemsSketch.update(Dimension.setEightDimension(cnRecordLog),cnRecordLog.getCommon_sessions()); + + DimensionItemsSketch eightSessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.EIGHT,eightSessionItemsSketch); + + stringItemsSketchHashMap.put("eightSession", eightSessionDimensionItemsSketch); + + + }else { + + + + stringItemsSketchHashMap.get("eightSession").getItemsSketch().update(Dimension.setEightDimension(cnRecordLog),cnRecordLog.getCommon_sessions()); + + + + + } + + + + return stringItemsSketchHashMap; + } + + @Override + public HashMap getResult(HashMap stringItemsSketchHashMap) { + + + return stringItemsSketchHashMap; + } + + @Override + public HashMap merge(HashMap stringItemsSketchHashMap, HashMap acc1) { + +// System.out.println("合并"); +// HashMap> unionSketchHashMap = new HashMap<>(); +// ItemsSketch sessionItemsSketch = new ItemsSketch<>(1048576); +// ItemsSketch pktItemsSketch = new ItemsSketch<>(1048576); +// ItemsSketch byteItemsSketch = new ItemsSketch<>(1048576); + +// ItemsSketch session_stringItemsSketch = stringItemsSketchHashMap.get("session"); +// ItemsSketch pkt_stringItemsSketch = stringItemsSketchHashMap.get("pkt"); +// ItemsSketch byte_stringItemsSketch = stringItemsSketchHashMap.get("byte"); + +// ItemsSketch session_acc1 = acc1.get("session"); +// ItemsSketch pkt_acc1 = acc1.get("pkt"); +// ItemsSketch byte_acc1 = acc1.get("byte"); + +// sessionItemsSketch.merge(session_stringItemsSketch); +// sessionItemsSketch.merge(session_acc1); + +// pktItemsSketch.merge(pkt_stringItemsSketch); +// pktItemsSketch.merge(pkt_acc1); +// +// byteItemsSketch.merge(byte_stringItemsSketch); +// byteItemsSketch.merge(byte_acc1); + +// unionSketchHashMap.put("session",sessionItemsSketch); +// unionSketchHashMap.put("pkt",pktItemsSketch); +// unionSketchHashMap.put("byte",byteItemsSketch); + + + return null; + } +} diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties index f6a9a9d..13cda90 100644 --- a/src/main/resources/common.properties +++ b/src/main/resources/common.properties @@ -56,4 +56,4 @@ kafka.producer.compression.type=none kafka_producer_broker=192.168.44.12:9092 -tmp.test.type=1 \ No newline at end of file +tmp.test.type=2 \ No newline at end of file