diff --git a/dependency-reduced-pom.xml b/dependency-reduced-pom.xml
new file mode 100644
index 0000000..478178b
--- /dev/null
+++ b/dependency-reduced-pom.xml
@@ -0,0 +1,100 @@
+
+
+ 4.0.0
+ com.galaxy.tsg
+ flink-top-task
+ 22-02-22
+
+
+
+ maven-compiler-plugin
+ 3.8.0
+
+ 1.8
+ 1.8
+ false
+
+ -Xpkginfo:always
+
+
+
+
+ maven-shade-plugin
+
+
+ flink-top-task
+ package
+
+ shade
+
+
+ flink-top-task-23-02-22
+
+
+ *:*
+
+ META-INF
+
+
+
+
+
+ com.galaxy.tsg.Toptask
+
+
+
+
+
+
+
+
+
+
+ nexus
+ Team Nexus Repository
+ http://192.168.40.125:8099/content/groups/public
+
+
+
+
+ org.apache.flink
+ flink-streaming-java_2.12
+ 1.13.1
+ provided
+
+
+ org.apache.flink
+ flink-java
+ 1.13.1
+ provided
+
+
+ org.apache.flink
+ flink-core
+ 1.13.1
+ provided
+
+
+ org.apache.flink
+ flink-json
+ 1.13.1
+ provided
+
+
+ org.apache.flink
+ flink-csv
+ 1.13.1
+ provided
+
+
+ org.apache.flink
+ flink-clients_2.11
+ 1.13.1
+ provided
+
+
+
+ 1.13.1
+ 2.7.1
+
+
diff --git a/flink-top-task.iml b/flink-top-task.iml
new file mode 100644
index 0000000..425046c
--- /dev/null
+++ b/flink-top-task.iml
@@ -0,0 +1,173 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/main/java/com/galaxy/tsg/Toptask.java b/src/main/java/com/galaxy/tsg/Toptask.java
index e6d6d0f..1841988 100644
--- a/src/main/java/com/galaxy/tsg/Toptask.java
+++ b/src/main/java/com/galaxy/tsg/Toptask.java
@@ -229,7 +229,7 @@ public class Toptask {
//datasketch
- //Session_record top1000 21个窗口一并计算
+ //clientip聚合TOP
SingleOutputStreamOperator clientipdStream2 = inputForSession.filter(new FilterFunction() {
@Override
public boolean filter(Entity value) throws Exception {
@@ -238,10 +238,174 @@ public class Toptask {
}).assignTimestampsAndWatermarks(strategyForSession);
- AllWindowedStream entityTimeWindowAllWindowedStream = clientipdStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)));
- SingleOutputStreamOperator aggregate = entityTimeWindowAllWindowedStream.aggregate(new UserHashMapCountAgg5(), new UserCountWindowResult5());
- aggregate.print();
+ clientipdStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
+ .aggregate(new DatasketchForMetricsAggregate("oneSession"), new UserCountWindowResult5())
+// .print()
+ .addSink(getKafkaSink("TOP-CLIENT-IP")).setParallelism(3);
+ clientipdStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
+ .aggregate(new DatasketchForMetricsAggregate("onePkt"), new UserCountWindowResult5())
+// .print()
+ .addSink(getKafkaSink("TOP-CLIENT-IP")).setParallelism(3);
+
+
+ clientipdStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
+ .aggregate(new DatasketchForMetricsAggregate("oneByte"), new UserCountWindowResult5())
+// .print()
+ .addSink(getKafkaSink("TOP-CLIENT-IP")).setParallelism(3);
+
+
+
+ //serverip聚合TOP
+
+ SingleOutputStreamOperator serveripdStream2 = inputForSession.filter(new FilterFunction() {
+ @Override
+ public boolean filter(Entity value) throws Exception {
+ return "IPv6_TCP".equals(value.getCommon_l4_protocol()) || "IPv4_TCP".equals(value.getCommon_l4_protocol());
+ }
+ }).assignTimestampsAndWatermarks(strategyForSession);
+
+ serveripdStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
+ .aggregate(new DatasketchForMetricsAggregate("twoSession"), new UserCountWindowResult5())
+// .print()
+ .addSink(getKafkaSink("TOP-SERVER-IP")).setParallelism(3);
+
+ serveripdStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
+ .aggregate(new DatasketchForMetricsAggregate("twoPkt"), new UserCountWindowResult5())
+// .print()
+ .addSink(getKafkaSink("TOP-SERVER-IP")).setParallelism(3);
+
+ serveripdStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
+ .aggregate(new DatasketchForMetricsAggregate("twoByte"), new UserCountWindowResult5())
+// .print()
+ .addSink(getKafkaSink("TOP-SERVER-IP")).setParallelism(3);
+
+
+
+ //common_internal_ip聚合TOP
+ SingleOutputStreamOperator internalStream2 = inputForSession.filter(new FilterFunction() {
+ @Override
+ public boolean filter(Entity value) throws Exception {
+ return StringUtil.isNotEmpty(value.getCommon_internal_ip());
+ }
+ }).assignTimestampsAndWatermarks(strategyForSession);
+
+ internalStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
+ .aggregate(new DatasketchForMetricsAggregate("threeSession"), new UserCountWindowResult5())
+// .print()
+ .addSink(getKafkaSink("TOP-INTERNAL-HOST")).setParallelism(3);
+
+ internalStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
+ .aggregate(new DatasketchForMetricsAggregate("threePkt"), new UserCountWindowResult5())
+// .print()
+ .addSink(getKafkaSink("TOP-INTERNAL-HOST")).setParallelism(3);
+
+ internalStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
+ .aggregate(new DatasketchForMetricsAggregate("threeByte"), new UserCountWindowResult5())
+// .print()
+ .addSink(getKafkaSink("TOP-INTERNAL-HOST")).setParallelism(3);
+
+
+
+ //common_external_ip聚合TOP
+
+ SingleOutputStreamOperator externalStream2 = inputForSession.filter(new FilterFunction() {
+ @Override
+ public boolean filter(Entity value) throws Exception {
+ return StringUtil.isNotEmpty(value.getCommon_external_ip());
+ }
+ }).assignTimestampsAndWatermarks(strategyForSession);
+
+ externalStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
+ .aggregate(new DatasketchForMetricsAggregate("fourSession"), new UserCountWindowResult5())
+// .print()
+ .addSink(getKafkaSink("TOP-EXTERNAL-HOST")).setParallelism(3);
+
+ externalStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
+ .aggregate(new DatasketchForMetricsAggregate("fourPkt"), new UserCountWindowResult5())
+// .print()
+ .addSink(getKafkaSink("TOP-EXTERNAL-HOST")).setParallelism(3);
+
+ externalStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
+ .aggregate(new DatasketchForMetricsAggregate("fourByte"), new UserCountWindowResult5())
+// .print()
+ .addSink(getKafkaSink("TOP-EXTERNAL-HOST")).setParallelism(3);
+
+
+
+ //http_domain聚合TOP
+
+ SingleOutputStreamOperator domainStream2 = inputForSession.filter(new FilterFunction() {
+ @Override
+ public boolean filter(Entity value) throws Exception {
+ return StringUtil.isNotEmpty(value.getHttp_domain());
+ }
+ }).assignTimestampsAndWatermarks(strategyForSession);
+
+
+ domainStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
+ .aggregate(new DatasketchForMetricsAggregate("fiveSession"), new UserCountWindowResult5())
+// .print()
+ .addSink(getKafkaSink("TOP-WEBSITE-DOMAIN")).setParallelism(3);
+
+ domainStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
+ .aggregate(new DatasketchForMetricsAggregate("fivePkt"), new UserCountWindowResult5())
+// .print()
+ .addSink(getKafkaSink("TOP-WEBSITE-DOMAIN")).setParallelism(3);
+
+ domainStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
+ .aggregate(new DatasketchForMetricsAggregate("fiveByte"), new UserCountWindowResult5())
+// .print()
+ .addSink(getKafkaSink("TOP-WEBSITE-DOMAIN")).setParallelism(3);
+
+
+ //common_subscriber_id聚合TOP
+ SingleOutputStreamOperator userStream2 = inputForSession.filter(new FilterFunction() {
+ @Override
+ public boolean filter(Entity value) throws Exception {
+ return StringUtil.isNotEmpty(value.getCommon_subscriber_id());
+ }
+ }).assignTimestampsAndWatermarks(strategyForSession);
+
+ userStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
+ .aggregate(new DatasketchForMetricsAggregate("sixSession"), new UserCountWindowResult5())
+// .print()
+ .addSink(getKafkaSink("TOP-USER")).setParallelism(3);
+
+ userStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
+ .aggregate(new DatasketchForMetricsAggregate("sixPkt"), new UserCountWindowResult5())
+// .print()
+ .addSink(getKafkaSink("TOP-USER")).setParallelism(3);
+
+ userStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
+ .aggregate(new DatasketchForMetricsAggregate("sixByte"), new UserCountWindowResult5())
+// .print()
+ .addSink(getKafkaSink("TOP-USER")).setParallelism(3);
+
+
+ //common_app_label聚合求全量
+ SingleOutputStreamOperator appNameStream2 = inputForSession.filter(new FilterFunction() {
+ @Override
+ public boolean filter(Entity value) throws Exception {
+ return StringUtil.isNotEmpty(value.getCommon_app_label());
+ }
+ }).assignTimestampsAndWatermarks(strategyForSession);
+
+
+ appNameStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
+ .aggregate(new DatasketchForMetricsAggregate("sevenSession"), new UserCountWindowResult5())
+// .print()
+ .addSink(getKafkaSink("TRAFFIC-APP-STAT")).setParallelism(TASK_PARALLELISM);
+
+ appNameStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
+ .aggregate(new DatasketchForMetricsAggregate("sevenPkt"), new UserCountWindowResult5())
+// .print()
+ .addSink(getKafkaSink("TRAFFIC-APP-STAT")).setParallelism(TASK_PARALLELISM);
+
+ appNameStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
+ .aggregate(new DatasketchForMetricsAggregate("sevenByte"), new UserCountWindowResult5())
+// .print()
+ .addSink(getKafkaSink("TRAFFIC-APP-STAT")).setParallelism(TASK_PARALLELISM);
@@ -256,9 +420,11 @@ public class Toptask {
}).assignTimestampsAndWatermarks(strategyForSecurity);
- AllWindowedStream urlEntityTimeWindowAllWindowedStream = UrlStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)));
- SingleOutputStreamOperator aggregate1 = urlEntityTimeWindowAllWindowedStream.aggregate(new UserHashMapCountAgg6(), new UserCountWindowResult5());
- aggregate1.print();
+ UrlStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
+ .aggregate(new DatasketchForUrlAggregate(), new UserCountWindowResult5())
+// .print()
+ .addSink(getKafkaSink("TOP-URLS")).setParallelism(3);
+
diff --git a/src/main/java/com/galaxy/tsg/function/CollectionValue.java b/src/main/java/com/galaxy/tsg/function/CollectionValue.java
new file mode 100644
index 0000000..e8bd256
--- /dev/null
+++ b/src/main/java/com/galaxy/tsg/function/CollectionValue.java
@@ -0,0 +1,191 @@
+package com.galaxy.tsg.function;
+
+import com.alibaba.fastjson.JSONObject;
+import com.galaxy.tsg.pojo.Entity;
+import com.galaxy.tsg.pojo.UrlEntity;
+
+/**
+ * @author fy
+ * @version 1.0
+ * @date 2023/3/8 19:46
+ * 同指标集合成的值
+ */
+public class CollectionValue {
+
+ public long c2s_byte_num;
+ public long c2s_pkt_num;
+ public long s2c_byte_num ;
+ public long s2c_pkt_num ;
+ public long session_num ;
+ public long stat_time ;
+ public long vsys_id ;
+ public String data_center;
+ public String device_group;
+ public String order_by;
+ public String source;
+
+ public CollectionValue(long c2s_byte_num, long c2s_pkt_num, long s2c_byte_num, long s2c_pkt_num, long session_num, long stat_time, long vsys_id, String data_center, String device_group, String order_by, String source) {
+ this.c2s_byte_num = c2s_byte_num;
+ this.c2s_pkt_num = c2s_pkt_num;
+ this.s2c_byte_num = s2c_byte_num;
+ this.s2c_pkt_num = s2c_pkt_num;
+ this.session_num = session_num;
+ this.stat_time = stat_time;
+ this.vsys_id = vsys_id;
+ this.data_center = data_center;
+ this.device_group = device_group;
+ this.order_by = order_by;
+ this.source = source;
+ }
+
+
+ public CollectionValue(Entity entity,String key ) {
+
+
+
+ this.c2s_byte_num = entity.getCommon_c2s_byte_num();
+ this.c2s_pkt_num = entity.getCommon_c2s_pkt_num();
+ this.s2c_byte_num = entity.getCommon_s2c_byte_num();
+ this.s2c_pkt_num = entity.getCommon_s2c_pkt_num();
+ this.session_num = entity.getCommon_sessions();
+ this.stat_time = System.currentTimeMillis() / 1000;
+ this.vsys_id = entity.getCommon_vsys_id();
+ this.data_center = entity.getCommon_data_center();
+ this.device_group = entity.getCommon_device_group();
+ this.order_by = key;
+// this.source = source;//分情况
+
+ }
+
+
+
+ public CollectionValue getCollectionValue(CollectionValue collectionValue,Entity entity){
+
+ collectionValue.setC2s_byte_num(collectionValue.getC2s_byte_num()+entity.getCommon_c2s_byte_num());
+
+ collectionValue.setC2s_pkt_num(collectionValue.getC2s_pkt_num()+entity.getCommon_c2s_pkt_num());
+
+ collectionValue.setS2c_byte_num(collectionValue.getS2c_byte_num()+entity.getCommon_s2c_byte_num());
+
+ collectionValue.setS2c_pkt_num(collectionValue.getS2c_pkt_num()+entity.getCommon_s2c_pkt_num());
+
+ collectionValue.setSession_num(collectionValue.getSession_num()+entity.getCommon_sessions());
+
+ return collectionValue;
+
+ }
+
+
+
+
+
+
+
+
+ public long getC2s_byte_num() {
+ return c2s_byte_num;
+ }
+
+ public void setC2s_byte_num(long c2s_byte_num) {
+ this.c2s_byte_num = c2s_byte_num;
+ }
+
+ public long getC2s_pkt_num() {
+ return c2s_pkt_num;
+ }
+
+ public void setC2s_pkt_num(long c2s_pkt_num) {
+ this.c2s_pkt_num = c2s_pkt_num;
+ }
+
+ public long getS2c_byte_num() {
+ return s2c_byte_num;
+ }
+
+ public void setS2c_byte_num(long s2c_byte_num) {
+ this.s2c_byte_num = s2c_byte_num;
+ }
+
+ public long getS2c_pkt_num() {
+ return s2c_pkt_num;
+ }
+
+ public void setS2c_pkt_num(long s2c_pkt_num) {
+ this.s2c_pkt_num = s2c_pkt_num;
+ }
+
+ public long getSession_num() {
+ return session_num;
+ }
+
+ public void setSession_num(long session_num) {
+ this.session_num = session_num;
+ }
+
+ public long getStat_time() {
+ return stat_time;
+ }
+
+ public void setStat_time(long stat_time) {
+ this.stat_time = stat_time;
+ }
+
+ public long getVsys_id() {
+ return vsys_id;
+ }
+
+ public void setVsys_id(long vsys_id) {
+ this.vsys_id = vsys_id;
+ }
+
+ public String getData_center() {
+ return data_center;
+ }
+
+ public void setData_center(String data_center) {
+ this.data_center = data_center;
+ }
+
+ public String getDevice_group() {
+ return device_group;
+ }
+
+ public void setDevice_group(String device_group) {
+ this.device_group = device_group;
+ }
+
+ public String getOrder_by() {
+ return order_by;
+ }
+
+ public void setOrder_by(String order_by) {
+ this.order_by = order_by;
+ }
+
+ public String getSource() {
+ return source;
+ }
+
+ public void setSource(String source) {
+ this.source = source;
+
+ }
+
+
+ @Override
+ public String toString() {
+ return "CollectionValue{" +
+ "c2s_byte_num=" + c2s_byte_num +
+ ", c2s_pkt_num=" + c2s_pkt_num +
+ ", s2c_byte_num=" + s2c_byte_num +
+ ", s2c_pkt_num=" + s2c_pkt_num +
+ ", session_num=" + session_num +
+ ", stat_time=" + stat_time +
+ ", vsys_id=" + vsys_id +
+ ", data_center='" + data_center + '\'' +
+ ", device_group='" + device_group + '\'' +
+ ", order_by='" + order_by + '\'' +
+ ", source='" + source + '\'' +
+ '}';
+ }
+}
diff --git a/src/main/java/com/galaxy/tsg/function/DatasketchForMetricsAggregate.java b/src/main/java/com/galaxy/tsg/function/DatasketchForMetricsAggregate.java
new file mode 100644
index 0000000..4883642
--- /dev/null
+++ b/src/main/java/com/galaxy/tsg/function/DatasketchForMetricsAggregate.java
@@ -0,0 +1,1116 @@
+package com.galaxy.tsg.function;
+
+
+import com.galaxy.tsg.pojo.Entity;
+import org.apache.datasketches.frequencies.ItemsSketch;
+import org.apache.flink.api.common.functions.AggregateFunction;
+
+import java.util.HashMap;
+
+/**
+ * @author fy
+ * @version 1.0
+ * @date 2023/2/14 17:29
+ *
+ *Session_record top10000 21个窗口计算
+ */
+public class DatasketchForMetricsAggregate implements AggregateFunction, HashMap> {
+
+ private String key ;
+
+
+ public DatasketchForMetricsAggregate(String key ){
+
+ this.key = key;
+ }
+
+
+ @Override
+ public HashMap createAccumulator() {
+ return new HashMap(1048576);
+ }
+
+ @Override
+ public HashMap add(Entity cnRecordLog, HashMap stringItemsSketchHashMap) {
+
+// String dimension = cnRecordLog.getCommon_client_ip();//维度
+// System.out.println(dimension);
+
+ switch (key){
+ case "oneSession":
+ if(stringItemsSketchHashMap.isEmpty()) {
+
+ ItemsSketch oneSessionItemsSketch = new ItemsSketch<>(1048576);//新建
+
+ oneSessionItemsSketch.update(Dimension.setOneDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
+
+ CollectionValue collectionValue = new CollectionValue(cnRecordLog,"sessions");
+ collectionValue.setSource(cnRecordLog.getCommon_client_ip());
+ HashMap stringCollectionValueHashMap = new HashMap<>();
+ stringCollectionValueHashMap.put(Dimension.setOneDimension(cnRecordLog),collectionValue);
+
+ DimensionItemsSketch oneSessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.ONE, oneSessionItemsSketch,stringCollectionValueHashMap);
+
+
+ stringItemsSketchHashMap.put("oneSession", oneSessionDimensionItemsSketch);
+
+
+ }else {
+
+ stringItemsSketchHashMap.get("oneSession").getItemsSketch().update(Dimension.setOneDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
+
+ CollectionValue oneSession = stringItemsSketchHashMap.get("oneSession").getStringCollectionValueHashMap().get(Dimension.setOneDimension(cnRecordLog));//从key获取集合
+
+ if (oneSession==null){
+
+ CollectionValue collectionValue = new CollectionValue(cnRecordLog,"sessions");
+ collectionValue.setSource(cnRecordLog.getCommon_client_ip());
+
+ stringItemsSketchHashMap.get("oneSession").getStringCollectionValueHashMap().put(Dimension.setOneDimension(cnRecordLog),collectionValue);
+
+ }else {//做加和
+
+ oneSession.getCollectionValue(oneSession,cnRecordLog);
+
+ stringItemsSketchHashMap.get("oneSession").getStringCollectionValueHashMap().put(Dimension.setOneDimension(cnRecordLog),oneSession);
+
+
+ }
+
+
+ }
+
+ break;
+ case "twoSession":
+ if(stringItemsSketchHashMap.isEmpty()) {
+
+ ItemsSketch twoSessionItemsSketch = new ItemsSketch<>(1048576);//新建
+
+ twoSessionItemsSketch.update(Dimension.setTwoDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
+
+ CollectionValue collectionValue = new CollectionValue(cnRecordLog,"sessions");
+ collectionValue.setSource(cnRecordLog.getCommon_server_ip());
+ HashMap stringCollectionValueHashMap = new HashMap<>();
+ stringCollectionValueHashMap.put(Dimension.setTwoDimension(cnRecordLog),collectionValue);
+
+
+ DimensionItemsSketch twoSessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.TWO,twoSessionItemsSketch,stringCollectionValueHashMap);
+
+ stringItemsSketchHashMap.put("twoSession", twoSessionDimensionItemsSketch);
+
+
+ }else {
+
+ stringItemsSketchHashMap.get("twoSession").getItemsSketch().update(Dimension.setTwoDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
+
+ CollectionValue twoSession = stringItemsSketchHashMap.get("twoSession").getStringCollectionValueHashMap().get(Dimension.setTwoDimension(cnRecordLog));
+
+
+ if (twoSession==null){
+
+ CollectionValue collectionValue = new CollectionValue(cnRecordLog,"sessions");
+ collectionValue.setSource(cnRecordLog.getCommon_server_ip());
+
+ stringItemsSketchHashMap.get("twoSession").getStringCollectionValueHashMap().put(Dimension.setTwoDimension(cnRecordLog),collectionValue);
+
+ }else {//做加和
+
+ twoSession.getCollectionValue(twoSession,cnRecordLog);
+
+ stringItemsSketchHashMap.get("twoSession").getStringCollectionValueHashMap().put(Dimension.setTwoDimension(cnRecordLog),twoSession);
+
+
+ }
+
+
+
+
+
+ }
+ break;
+ case "threeSession":
+ if(stringItemsSketchHashMap.isEmpty()) {
+
+ ItemsSketch threeSessionItemsSketch = new ItemsSketch<>(1048576);//新建
+
+ threeSessionItemsSketch.update(Dimension.setThreeDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
+
+ CollectionValue collectionValue = new CollectionValue(cnRecordLog,"sessions");
+ collectionValue.setSource(cnRecordLog.getCommon_internal_ip());
+ HashMap stringCollectionValueHashMap = new HashMap<>();
+ stringCollectionValueHashMap.put(Dimension.setThreeDimension(cnRecordLog),collectionValue);
+
+
+
+ DimensionItemsSketch threeSessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.THREE,threeSessionItemsSketch,stringCollectionValueHashMap);
+
+
+ stringItemsSketchHashMap.put("threeSession", threeSessionDimensionItemsSketch);
+
+ }else {
+
+ stringItemsSketchHashMap.get("threeSession").getItemsSketch().update(Dimension.setThreeDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
+
+
+ CollectionValue threeSession = stringItemsSketchHashMap.get("threeSession").getStringCollectionValueHashMap().get(Dimension.setThreeDimension(cnRecordLog));
+
+
+ if (threeSession==null){
+
+ CollectionValue collectionValue = new CollectionValue(cnRecordLog,"sessions");
+ collectionValue.setSource(cnRecordLog.getCommon_server_ip());
+
+ stringItemsSketchHashMap.get("threeSession").getStringCollectionValueHashMap().put(Dimension.setThreeDimension(cnRecordLog),collectionValue);
+
+ }else {//做加和
+
+ threeSession.getCollectionValue(threeSession,cnRecordLog);
+
+ stringItemsSketchHashMap.get("threeSession").getStringCollectionValueHashMap().put(Dimension.setThreeDimension(cnRecordLog),threeSession);
+
+
+ }
+
+
+
+
+
+ }
+ break;
+ case "fourSession":
+
+ if(stringItemsSketchHashMap.isEmpty()) {
+
+ ItemsSketch fourSessionItemsSketch = new ItemsSketch<>(1048576);//新建
+
+ fourSessionItemsSketch.update(Dimension.setFourDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
+
+
+ CollectionValue collectionValue = new CollectionValue(cnRecordLog,"sessions");
+ collectionValue.setSource(cnRecordLog.getCommon_external_ip());
+ HashMap stringCollectionValueHashMap = new HashMap<>();
+ stringCollectionValueHashMap.put(Dimension.setFourDimension(cnRecordLog),collectionValue);
+
+
+ DimensionItemsSketch fourSessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.FOUR,fourSessionItemsSketch,stringCollectionValueHashMap);
+
+ stringItemsSketchHashMap.put("fourSession", fourSessionDimensionItemsSketch);
+
+
+ }else {
+
+ stringItemsSketchHashMap.get("fourSession").getItemsSketch().update(Dimension.setFourDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
+
+
+
+ CollectionValue fourSession = stringItemsSketchHashMap.get("fourSession").getStringCollectionValueHashMap().get(Dimension.setFourDimension(cnRecordLog));
+
+
+ if (fourSession==null){
+
+ CollectionValue collectionValue = new CollectionValue(cnRecordLog,"sessions");
+ collectionValue.setSource(cnRecordLog.getCommon_external_ip());
+
+ stringItemsSketchHashMap.get("fourSession").getStringCollectionValueHashMap().put(Dimension.setFourDimension(cnRecordLog),collectionValue);
+
+ }else {//做加和
+
+ fourSession.getCollectionValue(fourSession,cnRecordLog);
+
+ stringItemsSketchHashMap.get("fourSession").getStringCollectionValueHashMap().put(Dimension.setFourDimension(cnRecordLog),fourSession);
+
+
+ }
+
+
+
+
+
+ }
+ break;
+ case "fiveSession":
+ if(stringItemsSketchHashMap.isEmpty()) {
+
+ ItemsSketch fiveSessionItemsSketch = new ItemsSketch<>(1048576);//新建
+
+ fiveSessionItemsSketch.update(Dimension.setFiveDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
+
+
+ CollectionValue collectionValue = new CollectionValue(cnRecordLog,"sessions");
+ collectionValue.setSource(cnRecordLog.getHttp_domain());
+ HashMap stringCollectionValueHashMap = new HashMap<>();
+ stringCollectionValueHashMap.put(Dimension.setFiveDimension(cnRecordLog),collectionValue);
+
+
+
+
+ DimensionItemsSketch fiveSessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.FIVE,fiveSessionItemsSketch,stringCollectionValueHashMap);
+
+
+ stringItemsSketchHashMap.put("fiveSession", fiveSessionDimensionItemsSketch);
+
+
+ }else {
+
+ stringItemsSketchHashMap.get("fiveSession").getItemsSketch().update(Dimension.setFiveDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
+
+
+ CollectionValue fiveSession = stringItemsSketchHashMap.get("fiveSession").getStringCollectionValueHashMap().get(Dimension.setFiveDimension(cnRecordLog));
+
+
+ if (fiveSession==null){
+
+ CollectionValue collectionValue = new CollectionValue(cnRecordLog,"sessions");
+ collectionValue.setSource(cnRecordLog.getHttp_domain());
+
+ stringItemsSketchHashMap.get("fiveSession").getStringCollectionValueHashMap().put(Dimension.setFiveDimension(cnRecordLog),collectionValue);
+
+ }else {//做加和
+
+ fiveSession.getCollectionValue(fiveSession,cnRecordLog);
+
+ stringItemsSketchHashMap.get("fiveSession").getStringCollectionValueHashMap().put(Dimension.setFiveDimension(cnRecordLog),fiveSession);
+
+
+ }
+
+
+
+
+ }
+ break;
+ case "sixSession":
+ if(stringItemsSketchHashMap.isEmpty()) {
+
+ ItemsSketch sixSessionItemsSketch = new ItemsSketch<>(1048576);//新建
+
+ sixSessionItemsSketch.update(Dimension.setSixDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
+
+
+ CollectionValue collectionValue = new CollectionValue(cnRecordLog,"sessions");
+ collectionValue.setSource(cnRecordLog.getCommon_subscriber_id());
+ HashMap stringCollectionValueHashMap = new HashMap<>();
+ stringCollectionValueHashMap.put(Dimension.setSixDimension(cnRecordLog),collectionValue);
+
+
+
+ DimensionItemsSketch sixSessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.SIX,sixSessionItemsSketch,stringCollectionValueHashMap);
+
+ stringItemsSketchHashMap.put("sixSession", sixSessionDimensionItemsSketch);
+
+
+ }else {
+
+ stringItemsSketchHashMap.get("sixSession").getItemsSketch().update(Dimension.setSixDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
+
+
+ CollectionValue sixSession = stringItemsSketchHashMap.get("sixSession").getStringCollectionValueHashMap().get(Dimension.setSixDimension(cnRecordLog));
+
+
+ if (sixSession==null){
+
+ CollectionValue collectionValue = new CollectionValue(cnRecordLog,"sessions");
+ collectionValue.setSource(cnRecordLog.getCommon_subscriber_id());
+
+ stringItemsSketchHashMap.get("sixSession").getStringCollectionValueHashMap().put(Dimension.setSixDimension(cnRecordLog),collectionValue);
+
+ }else {//做加和
+
+ sixSession.getCollectionValue(sixSession,cnRecordLog);
+
+ stringItemsSketchHashMap.get("sixSession").getStringCollectionValueHashMap().put(Dimension.setSixDimension(cnRecordLog),sixSession);
+
+
+ }
+
+
+
+
+
+
+
+ }
+ break;
+ case "sevenSession":
+ if(stringItemsSketchHashMap.isEmpty()) {
+
+ ItemsSketch sevenSessionItemsSketch = new ItemsSketch<>(1048576);//新建
+
+ sevenSessionItemsSketch.update(Dimension.setSevenDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
+
+ CollectionValue collectionValue = new CollectionValue(cnRecordLog,"sessions");
+ collectionValue.setSource(cnRecordLog.getCommon_app_label());
+ HashMap stringCollectionValueHashMap = new HashMap<>();
+ stringCollectionValueHashMap.put(Dimension.setSevenDimension(cnRecordLog),collectionValue);
+
+
+
+
+ DimensionItemsSketch sevenSessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.SEVEN,sevenSessionItemsSketch,stringCollectionValueHashMap);
+
+ stringItemsSketchHashMap.put("sevenSession", sevenSessionDimensionItemsSketch);
+
+
+ }else {
+
+ stringItemsSketchHashMap.get("sevenSession").getItemsSketch().update(Dimension.setSevenDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
+
+
+ CollectionValue sevenSession = stringItemsSketchHashMap.get("sevenSession").getStringCollectionValueHashMap().get(Dimension.setSevenDimension(cnRecordLog));
+
+
+ if (sevenSession==null){
+
+ CollectionValue collectionValue = new CollectionValue(cnRecordLog,"sessions");
+ collectionValue.setSource(cnRecordLog.getCommon_app_label());
+
+ stringItemsSketchHashMap.get("sevenSession").getStringCollectionValueHashMap().put(Dimension.setSevenDimension(cnRecordLog),collectionValue);
+
+ }else {//做加和
+
+ sevenSession.getCollectionValue(sevenSession,cnRecordLog);
+
+ stringItemsSketchHashMap.get("sevenSession").getStringCollectionValueHashMap().put(Dimension.setSevenDimension(cnRecordLog),sevenSession);
+
+
+ }
+
+
+
+
+ }
+ break;
+ case "onePkt":
+ if(stringItemsSketchHashMap.isEmpty()) {
+
+ ItemsSketch onePktItemsSketch = new ItemsSketch<>(1048576);
+
+ onePktItemsSketch.update(Dimension.setOneDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num());
+
+ CollectionValue collectionValue = new CollectionValue(cnRecordLog,"packets");
+ collectionValue.setSource(cnRecordLog.getCommon_client_ip());
+ HashMap stringCollectionValueHashMap = new HashMap<>();
+ stringCollectionValueHashMap.put(Dimension.setOneDimension(cnRecordLog),collectionValue);
+
+
+
+ DimensionItemsSketch onePktDimensionItemsSketch = new DimensionItemsSketch(Dimension.ONE,onePktItemsSketch,stringCollectionValueHashMap);
+
+ stringItemsSketchHashMap.put("onePkt",onePktDimensionItemsSketch);
+
+
+ }else {
+
+ stringItemsSketchHashMap.get("onePkt").getItemsSketch().update(Dimension.setOneDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num());
+
+ CollectionValue onePkt = stringItemsSketchHashMap.get("onePkt").getStringCollectionValueHashMap().get(Dimension.setOneDimension(cnRecordLog));//从key获取集合
+
+ if (onePkt==null){
+
+ CollectionValue collectionValue = new CollectionValue(cnRecordLog,"packets");
+ collectionValue.setSource(cnRecordLog.getCommon_client_ip());
+
+ stringItemsSketchHashMap.get("onePkt").getStringCollectionValueHashMap().put(Dimension.setOneDimension(cnRecordLog),collectionValue);
+
+ }else {//做加和
+
+ onePkt.getCollectionValue(onePkt,cnRecordLog);
+
+ stringItemsSketchHashMap.get("onePkt").getStringCollectionValueHashMap().put(Dimension.setOneDimension(cnRecordLog),onePkt);
+
+
+ }
+
+
+
+
+ }
+ break;
+ case "twoPkt":
+ if(stringItemsSketchHashMap.isEmpty()) {
+
+ ItemsSketch twoPktItemsSketch = new ItemsSketch<>(1048576);
+
+ twoPktItemsSketch.update(Dimension.setTwoDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num());
+
+ CollectionValue collectionValue = new CollectionValue(cnRecordLog,"packets");
+ collectionValue.setSource(cnRecordLog.getCommon_server_ip());
+ HashMap stringCollectionValueHashMap = new HashMap<>();
+ stringCollectionValueHashMap.put(Dimension.setTwoDimension(cnRecordLog),collectionValue);
+
+
+
+
+ DimensionItemsSketch twoPktDimensionItemsSketch = new DimensionItemsSketch(Dimension.TWO,twoPktItemsSketch,stringCollectionValueHashMap);
+
+ stringItemsSketchHashMap.put("twoPkt",twoPktDimensionItemsSketch);
+
+ }else {
+
+ stringItemsSketchHashMap.get("twoPkt").getItemsSketch().update(Dimension.setTwoDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num());
+
+
+ CollectionValue twoPkt = stringItemsSketchHashMap.get("twoPkt").getStringCollectionValueHashMap().get(Dimension.setTwoDimension(cnRecordLog));
+
+
+ if (twoPkt==null){
+
+ CollectionValue collectionValue = new CollectionValue(cnRecordLog,"packets");
+ collectionValue.setSource(cnRecordLog.getCommon_server_ip());
+
+ stringItemsSketchHashMap.get("twoPkt").getStringCollectionValueHashMap().put(Dimension.setTwoDimension(cnRecordLog),collectionValue);
+
+ }else {//做加和
+
+ twoPkt.getCollectionValue(twoPkt,cnRecordLog);
+
+ stringItemsSketchHashMap.get("twoPkt").getStringCollectionValueHashMap().put(Dimension.setTwoDimension(cnRecordLog),twoPkt);
+
+
+ }
+
+
+
+
+ }
+ break;
+ case "threePkt":
+ if(stringItemsSketchHashMap.isEmpty()) {
+
+ ItemsSketch threePktItemsSketch = new ItemsSketch<>(1048576);
+
+ threePktItemsSketch.update(Dimension.setThreeDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num());
+
+ CollectionValue collectionValue = new CollectionValue(cnRecordLog,"packets");
+ collectionValue.setSource(cnRecordLog.getCommon_internal_ip());
+ HashMap stringCollectionValueHashMap = new HashMap<>();
+ stringCollectionValueHashMap.put(Dimension.setThreeDimension(cnRecordLog),collectionValue);
+
+
+
+
+
+
+ DimensionItemsSketch threePktDimensionItemsSketch = new DimensionItemsSketch(Dimension.THREE,threePktItemsSketch,stringCollectionValueHashMap);
+
+ stringItemsSketchHashMap.put("threePkt",threePktDimensionItemsSketch);
+
+ }else {
+
+ stringItemsSketchHashMap.get("threePkt").getItemsSketch().update(Dimension.setThreeDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num());
+
+
+ CollectionValue threePkt = stringItemsSketchHashMap.get("threePkt").getStringCollectionValueHashMap().get(Dimension.setThreeDimension(cnRecordLog));
+
+
+ if (threePkt==null){
+
+ CollectionValue collectionValue = new CollectionValue(cnRecordLog,"packets");
+ collectionValue.setSource(cnRecordLog.getCommon_server_ip());
+
+ stringItemsSketchHashMap.get("threePkt").getStringCollectionValueHashMap().put(Dimension.setThreeDimension(cnRecordLog),collectionValue);
+
+ }else {//做加和
+
+ threePkt.getCollectionValue(threePkt,cnRecordLog);
+
+ stringItemsSketchHashMap.get("threePkt").getStringCollectionValueHashMap().put(Dimension.setThreeDimension(cnRecordLog),threePkt);
+
+
+ }
+
+
+
+
+
+ }
+ break;
+ case "fourPkt":
+ if(stringItemsSketchHashMap.isEmpty()) {
+
+ ItemsSketch fourPktItemsSketch = new ItemsSketch<>(1048576);
+
+ fourPktItemsSketch.update(Dimension.setFourDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num());
+
+
+ CollectionValue collectionValue = new CollectionValue(cnRecordLog,"packets");
+ collectionValue.setSource(cnRecordLog.getCommon_external_ip());
+ HashMap stringCollectionValueHashMap = new HashMap<>();
+ stringCollectionValueHashMap.put(Dimension.setFourDimension(cnRecordLog),collectionValue);
+
+
+
+ DimensionItemsSketch fourPktDimensionItemsSketch = new DimensionItemsSketch(Dimension.FOUR,fourPktItemsSketch,stringCollectionValueHashMap);
+
+ stringItemsSketchHashMap.put("fourPkt",fourPktDimensionItemsSketch);
+
+
+ }else {
+
+ stringItemsSketchHashMap.get("fourPkt").getItemsSketch().update(Dimension.setFourDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num());
+
+
+
+ CollectionValue fourPkt = stringItemsSketchHashMap.get("fourPkt").getStringCollectionValueHashMap().get(Dimension.setFourDimension(cnRecordLog));
+
+
+ if (fourPkt==null){
+
+ CollectionValue collectionValue = new CollectionValue(cnRecordLog,"packets");
+ collectionValue.setSource(cnRecordLog.getCommon_external_ip());
+
+ stringItemsSketchHashMap.get("fourPkt").getStringCollectionValueHashMap().put(Dimension.setFourDimension(cnRecordLog),collectionValue);
+
+ }else {//做加和
+
+ fourPkt.getCollectionValue(fourPkt,cnRecordLog);
+
+ stringItemsSketchHashMap.get("fourPkt").getStringCollectionValueHashMap().put(Dimension.setFourDimension(cnRecordLog),fourPkt);
+
+
+ }
+
+
+
+
+
+
+ }
+ break;
+ case "fivePkt":
+ if(stringItemsSketchHashMap.isEmpty()) {
+
+ ItemsSketch fivePktItemsSketch = new ItemsSketch<>(1048576);
+
+ fivePktItemsSketch.update(Dimension.setFiveDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num());
+
+ CollectionValue collectionValue = new CollectionValue(cnRecordLog,"packets");
+ collectionValue.setSource(cnRecordLog.getHttp_domain());
+ HashMap stringCollectionValueHashMap = new HashMap<>();
+ stringCollectionValueHashMap.put(Dimension.setFiveDimension(cnRecordLog),collectionValue);
+
+
+
+
+
+ DimensionItemsSketch fivePktDimensionItemsSketch = new DimensionItemsSketch(Dimension.FIVE,fivePktItemsSketch,stringCollectionValueHashMap);
+
+ stringItemsSketchHashMap.put("fivePkt",fivePktDimensionItemsSketch);
+
+
+ }else {
+
+ stringItemsSketchHashMap.get("fivePkt").getItemsSketch().update(Dimension.setFiveDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num());
+
+ CollectionValue fivePkt = stringItemsSketchHashMap.get("fivePkt").getStringCollectionValueHashMap().get(Dimension.setFiveDimension(cnRecordLog));
+
+
+ if (fivePkt==null){
+
+ CollectionValue collectionValue = new CollectionValue(cnRecordLog,"packets");
+ collectionValue.setSource(cnRecordLog.getHttp_domain());
+
+ stringItemsSketchHashMap.get("fivePkt").getStringCollectionValueHashMap().put(Dimension.setFiveDimension(cnRecordLog),collectionValue);
+
+ }else {//做加和
+
+ fivePkt.getCollectionValue(fivePkt,cnRecordLog);
+
+ stringItemsSketchHashMap.get("fivePkt").getStringCollectionValueHashMap().put(Dimension.setFiveDimension(cnRecordLog),fivePkt);
+
+
+ }
+
+
+
+
+
+ }
+ break;
+ case "sixPkt":
+ if(stringItemsSketchHashMap.isEmpty()) {
+
+
+ ItemsSketch sixPktItemsSketch = new ItemsSketch<>(1048576);
+
+ sixPktItemsSketch.update(Dimension.setSixDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num());
+
+ CollectionValue collectionValue = new CollectionValue(cnRecordLog,"packets");
+ collectionValue.setSource(cnRecordLog.getCommon_subscriber_id());
+ HashMap stringCollectionValueHashMap = new HashMap<>();
+ stringCollectionValueHashMap.put(Dimension.setSixDimension(cnRecordLog),collectionValue);
+
+
+
+
+
+ DimensionItemsSketch sixPktDimensionItemsSketch = new DimensionItemsSketch(Dimension.SIX,sixPktItemsSketch,stringCollectionValueHashMap);
+
+ stringItemsSketchHashMap.put("sixPkt",sixPktDimensionItemsSketch);
+
+ }else {
+
+ stringItemsSketchHashMap.get("sixPkt").getItemsSketch().update(Dimension.setSixDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num());
+
+ CollectionValue sixPkt = stringItemsSketchHashMap.get("sixPkt").getStringCollectionValueHashMap().get(Dimension.setSixDimension(cnRecordLog));
+
+
+ if (sixPkt==null){
+
+ CollectionValue collectionValue = new CollectionValue(cnRecordLog,"packets");
+ collectionValue.setSource(cnRecordLog.getCommon_subscriber_id());
+
+ stringItemsSketchHashMap.get("sixPkt").getStringCollectionValueHashMap().put(Dimension.setSixDimension(cnRecordLog),collectionValue);
+
+ }else {//做加和
+
+ sixPkt.getCollectionValue(sixPkt,cnRecordLog);
+
+ stringItemsSketchHashMap.get("sixPkt").getStringCollectionValueHashMap().put(Dimension.setSixDimension(cnRecordLog),sixPkt);
+
+
+ }
+
+
+
+
+
+ }
+ break;
+ case "sevenPkt":
+ if(stringItemsSketchHashMap.isEmpty()) {
+
+ ItemsSketch sevenPktItemsSketch = new ItemsSketch<>(1048576);
+
+ sevenPktItemsSketch.update(Dimension.setSevenDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num());
+
+ CollectionValue collectionValue = new CollectionValue(cnRecordLog,"packets");
+ collectionValue.setSource(cnRecordLog.getCommon_app_label());
+ HashMap stringCollectionValueHashMap = new HashMap<>();
+ stringCollectionValueHashMap.put(Dimension.setSevenDimension(cnRecordLog),collectionValue);
+
+
+
+
+
+ DimensionItemsSketch sevenPktDimensionItemsSketch = new DimensionItemsSketch(Dimension.SEVEN,sevenPktItemsSketch,stringCollectionValueHashMap);
+
+ stringItemsSketchHashMap.put("sevenPkt",sevenPktDimensionItemsSketch);
+
+ }else {
+
+ stringItemsSketchHashMap.get("sevenPkt").getItemsSketch().update(Dimension.setSevenDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num());
+
+ CollectionValue sevenPkt = stringItemsSketchHashMap.get("sevenPkt").getStringCollectionValueHashMap().get(Dimension.setSevenDimension(cnRecordLog));
+
+
+ if (sevenPkt==null){
+
+ CollectionValue collectionValue = new CollectionValue(cnRecordLog,"packets");
+ collectionValue.setSource(cnRecordLog.getCommon_app_label());
+
+ stringItemsSketchHashMap.get("sevenPkt").getStringCollectionValueHashMap().put(Dimension.setSevenDimension(cnRecordLog),collectionValue);
+
+ }else {//做加和
+
+ sevenPkt.getCollectionValue(sevenPkt,cnRecordLog);
+
+ stringItemsSketchHashMap.get("sevenPkt").getStringCollectionValueHashMap().put(Dimension.setSevenDimension(cnRecordLog),sevenPkt);
+
+
+ }
+
+
+
+
+ }
+ break;
+ case "oneByte":
+ if(stringItemsSketchHashMap.isEmpty()) {
+
+
+ ItemsSketch oneByteItemsSketch = new ItemsSketch<>(1048576);
+
+
+ oneByteItemsSketch.update(Dimension.setOneDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num());
+
+ CollectionValue collectionValue = new CollectionValue(cnRecordLog,"bytes");
+ collectionValue.setSource(cnRecordLog.getCommon_client_ip());
+ HashMap stringCollectionValueHashMap = new HashMap<>();
+ stringCollectionValueHashMap.put(Dimension.setOneDimension(cnRecordLog),collectionValue);
+
+
+
+
+
+ DimensionItemsSketch oneByteDimensionItemsSketch = new DimensionItemsSketch(Dimension.ONE,oneByteItemsSketch,stringCollectionValueHashMap);
+
+ stringItemsSketchHashMap.put("oneByte",oneByteDimensionItemsSketch);
+
+ }else {
+
+ stringItemsSketchHashMap.get("oneByte").getItemsSketch().update(Dimension.setOneDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num());
+
+
+ CollectionValue oneByte = stringItemsSketchHashMap.get("oneByte").getStringCollectionValueHashMap().get(Dimension.setOneDimension(cnRecordLog));//从key获取集合
+
+ if (oneByte==null){
+
+ CollectionValue collectionValue = new CollectionValue(cnRecordLog,"bytes");
+ collectionValue.setSource(cnRecordLog.getCommon_client_ip());
+
+ stringItemsSketchHashMap.get("oneByte").getStringCollectionValueHashMap().put(Dimension.setOneDimension(cnRecordLog),collectionValue);
+
+ }else {//做加和
+
+ oneByte.getCollectionValue(oneByte,cnRecordLog);
+
+ stringItemsSketchHashMap.get("oneByte").getStringCollectionValueHashMap().put(Dimension.setOneDimension(cnRecordLog),oneByte);
+
+
+ }
+
+
+
+ }
+ break;
+ case "twoByte":
+ if(stringItemsSketchHashMap.isEmpty()) {
+
+ ItemsSketch twoByteItemsSketch = new ItemsSketch<>(1048576);
+
+ twoByteItemsSketch.update(Dimension.setTwoDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num());
+
+ CollectionValue collectionValue = new CollectionValue(cnRecordLog,"bytes");
+ collectionValue.setSource(cnRecordLog.getCommon_server_ip());
+ HashMap stringCollectionValueHashMap = new HashMap<>();
+ stringCollectionValueHashMap.put(Dimension.setTwoDimension(cnRecordLog),collectionValue);
+
+
+
+
+
+ DimensionItemsSketch twoByteDimensionItemsSketch = new DimensionItemsSketch(Dimension.TWO,twoByteItemsSketch,stringCollectionValueHashMap);
+
+ stringItemsSketchHashMap.put("twoByte",twoByteDimensionItemsSketch);
+
+ }else {
+
+ stringItemsSketchHashMap.get("twoByte").getItemsSketch().update(Dimension.setTwoDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num());
+
+ CollectionValue twoByte = stringItemsSketchHashMap.get("twoByte").getStringCollectionValueHashMap().get(Dimension.setTwoDimension(cnRecordLog));
+
+
+ if (twoByte==null){
+
+ CollectionValue collectionValue = new CollectionValue(cnRecordLog,"bytes");
+ collectionValue.setSource(cnRecordLog.getCommon_server_ip());
+
+ stringItemsSketchHashMap.get("twoByte").getStringCollectionValueHashMap().put(Dimension.setTwoDimension(cnRecordLog),collectionValue);
+
+ }else {//做加和
+
+ twoByte.getCollectionValue(twoByte,cnRecordLog);
+
+ stringItemsSketchHashMap.get("twoByte").getStringCollectionValueHashMap().put(Dimension.setTwoDimension(cnRecordLog),twoByte);
+
+
+ }
+
+
+
+ }
+ break;
+ case "threeByte":
+ if(stringItemsSketchHashMap.isEmpty()) {
+
+
+ ItemsSketch threeByteItemsSketch = new ItemsSketch<>(1048576);
+
+ threeByteItemsSketch.update(Dimension.setThreeDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num());
+
+ CollectionValue collectionValue = new CollectionValue(cnRecordLog,"bytes");
+ collectionValue.setSource(cnRecordLog.getCommon_internal_ip());
+ HashMap stringCollectionValueHashMap = new HashMap<>();
+ stringCollectionValueHashMap.put(Dimension.setThreeDimension(cnRecordLog),collectionValue);
+
+
+
+
+
+
+ DimensionItemsSketch threeByteDimensionItemsSketch = new DimensionItemsSketch(Dimension.THREE,threeByteItemsSketch,stringCollectionValueHashMap);
+
+
+ stringItemsSketchHashMap.put("threeByte",threeByteDimensionItemsSketch);
+
+ }else {
+
+ stringItemsSketchHashMap.get("threeByte").getItemsSketch().update(Dimension.setThreeDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num());
+
+
+ CollectionValue threeByte = stringItemsSketchHashMap.get("threeByte").getStringCollectionValueHashMap().get(Dimension.setThreeDimension(cnRecordLog));
+
+
+ if (threeByte==null){
+
+ CollectionValue collectionValue = new CollectionValue(cnRecordLog,"bytes");
+ collectionValue.setSource(cnRecordLog.getCommon_server_ip());
+
+ stringItemsSketchHashMap.get("threeByte").getStringCollectionValueHashMap().put(Dimension.setThreeDimension(cnRecordLog),collectionValue);
+
+ }else {//做加和
+
+ threeByte.getCollectionValue(threeByte,cnRecordLog);
+
+ stringItemsSketchHashMap.get("threeByte").getStringCollectionValueHashMap().put(Dimension.setThreeDimension(cnRecordLog),threeByte);
+
+
+ }
+
+
+
+
+
+ }
+ break;
+ case "fourByte":
+ if(stringItemsSketchHashMap.isEmpty()) {
+
+
+
+ ItemsSketch fourByteItemsSketch = new ItemsSketch<>(1048576);
+
+ fourByteItemsSketch.update(Dimension.setFourDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num());
+
+
+ CollectionValue collectionValue = new CollectionValue(cnRecordLog,"bytes");
+ collectionValue.setSource(cnRecordLog.getCommon_external_ip());
+ HashMap stringCollectionValueHashMap = new HashMap<>();
+ stringCollectionValueHashMap.put(Dimension.setFourDimension(cnRecordLog),collectionValue);
+
+
+
+
+
+ DimensionItemsSketch fourByteDimensionItemsSketch = new DimensionItemsSketch(Dimension.FOUR,fourByteItemsSketch,stringCollectionValueHashMap);
+
+
+
+ stringItemsSketchHashMap.put("fourByte",fourByteDimensionItemsSketch);
+
+ }else {
+
+ stringItemsSketchHashMap.get("fourByte").getItemsSketch().update(Dimension.setFourDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num());
+
+ CollectionValue fourByte = stringItemsSketchHashMap.get("fourByte").getStringCollectionValueHashMap().get(Dimension.setFourDimension(cnRecordLog));
+
+
+ if (fourByte==null){
+
+ CollectionValue collectionValue = new CollectionValue(cnRecordLog,"bytes");
+ collectionValue.setSource(cnRecordLog.getCommon_external_ip());
+
+ stringItemsSketchHashMap.get("fourByte").getStringCollectionValueHashMap().put(Dimension.setFourDimension(cnRecordLog),collectionValue);
+
+ }else {//做加和
+
+ fourByte.getCollectionValue(fourByte,cnRecordLog);
+
+ stringItemsSketchHashMap.get("fourByte").getStringCollectionValueHashMap().put(Dimension.setFourDimension(cnRecordLog),fourByte);
+
+
+ }
+
+
+
+
+
+
+ }
+
+ break;
+ case "fiveByte":
+ if(stringItemsSketchHashMap.isEmpty()) {
+
+
+ ItemsSketch fiveByteItemsSketch = new ItemsSketch<>(1048576);
+
+ fiveByteItemsSketch.update(Dimension.setFiveDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num());
+
+ CollectionValue collectionValue = new CollectionValue(cnRecordLog,"bytes");
+ collectionValue.setSource(cnRecordLog.getHttp_domain());
+ HashMap stringCollectionValueHashMap = new HashMap<>();
+ stringCollectionValueHashMap.put(Dimension.setFiveDimension(cnRecordLog),collectionValue);
+
+
+
+
+
+
+ DimensionItemsSketch fiveByteDimensionItemsSketch = new DimensionItemsSketch(Dimension.FIVE,fiveByteItemsSketch,stringCollectionValueHashMap);
+
+ stringItemsSketchHashMap.put("fiveByte",fiveByteDimensionItemsSketch);
+
+ }else {
+ stringItemsSketchHashMap.get("fiveByte").getItemsSketch().update(Dimension.setFiveDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num());
+
+ CollectionValue fiveByte = stringItemsSketchHashMap.get("fiveByte").getStringCollectionValueHashMap().get(Dimension.setFiveDimension(cnRecordLog));
+
+
+ if (fiveByte==null){
+
+ CollectionValue collectionValue = new CollectionValue(cnRecordLog,"bytes");
+ collectionValue.setSource(cnRecordLog.getHttp_domain());
+
+ stringItemsSketchHashMap.get("fiveByte").getStringCollectionValueHashMap().put(Dimension.setFiveDimension(cnRecordLog),collectionValue);
+
+ }else {//做加和
+
+ fiveByte.getCollectionValue(fiveByte,cnRecordLog);
+
+ stringItemsSketchHashMap.get("fiveByte").getStringCollectionValueHashMap().put(Dimension.setFiveDimension(cnRecordLog),fiveByte);
+
+
+ }
+
+
+
+
+
+ }
+
+ break;
+ case "sixByte":
+ if(stringItemsSketchHashMap.isEmpty()) {
+
+
+ ItemsSketch sixByteItemsSketch = new ItemsSketch<>(1048576);
+
+ sixByteItemsSketch.update(Dimension.setSixDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num());
+
+
+ CollectionValue collectionValue = new CollectionValue(cnRecordLog,"bytes");
+ collectionValue.setSource(cnRecordLog.getCommon_subscriber_id());
+ HashMap stringCollectionValueHashMap = new HashMap<>();
+ stringCollectionValueHashMap.put(Dimension.setSixDimension(cnRecordLog),collectionValue);
+
+
+
+
+
+ DimensionItemsSketch sixByteDimensionItemsSketch = new DimensionItemsSketch(Dimension.SIX,sixByteItemsSketch,stringCollectionValueHashMap);
+
+
+ stringItemsSketchHashMap.put("sixByte",sixByteDimensionItemsSketch);
+
+
+ }else {
+
+ stringItemsSketchHashMap.get("sixByte").getItemsSketch().update(Dimension.setSixDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num());
+
+ CollectionValue sixByte = stringItemsSketchHashMap.get("sixByte").getStringCollectionValueHashMap().get(Dimension.setSixDimension(cnRecordLog));
+
+
+ if (sixByte==null){
+
+ CollectionValue collectionValue = new CollectionValue(cnRecordLog,"bytes");
+ collectionValue.setSource(cnRecordLog.getCommon_subscriber_id());
+
+ stringItemsSketchHashMap.get("sixByte").getStringCollectionValueHashMap().put(Dimension.setSixDimension(cnRecordLog),collectionValue);
+
+ }else {//做加和
+
+ sixByte.getCollectionValue(sixByte,cnRecordLog);
+
+ stringItemsSketchHashMap.get("sixByte").getStringCollectionValueHashMap().put(Dimension.setSixDimension(cnRecordLog),sixByte);
+
+
+ }
+
+
+
+
+
+
+ }
+
+ break;
+ case "sevenByte":
+ if(stringItemsSketchHashMap.isEmpty()) {
+
+
+ ItemsSketch sevenByteItemsSketch = new ItemsSketch<>(1048576);
+
+
+ sevenByteItemsSketch.update(Dimension.setSevenDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num());
+
+
+ CollectionValue collectionValue = new CollectionValue(cnRecordLog,"bytes");
+ collectionValue.setSource(cnRecordLog.getCommon_app_label());
+ HashMap stringCollectionValueHashMap = new HashMap<>();
+ stringCollectionValueHashMap.put(Dimension.setSevenDimension(cnRecordLog),collectionValue);
+
+
+
+
+
+ DimensionItemsSketch sevenByteDimensionItemsSketch = new DimensionItemsSketch(Dimension.SEVEN,sevenByteItemsSketch,stringCollectionValueHashMap);
+
+
+ stringItemsSketchHashMap.put("sevenByte",sevenByteDimensionItemsSketch);
+
+
+ }else {
+
+ stringItemsSketchHashMap.get("sevenByte").getItemsSketch().update(Dimension.setSevenDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num());
+
+ CollectionValue sevenByte = stringItemsSketchHashMap.get("sevenByte").getStringCollectionValueHashMap().get(Dimension.setSevenDimension(cnRecordLog));
+
+
+ if (sevenByte==null){
+
+ CollectionValue collectionValue = new CollectionValue(cnRecordLog,"bytes");
+ collectionValue.setSource(cnRecordLog.getCommon_app_label());
+
+ stringItemsSketchHashMap.get("sevenByte").getStringCollectionValueHashMap().put(Dimension.setSevenDimension(cnRecordLog),collectionValue);
+
+ }else {//做加和
+
+ sevenByte.getCollectionValue(sevenByte,cnRecordLog);
+
+ stringItemsSketchHashMap.get("sevenByte").getStringCollectionValueHashMap().put(Dimension.setSevenDimension(cnRecordLog),sevenByte);
+
+
+ }
+
+
+
+ }
+ break;
+
+
+ }
+
+
+
+
+
+ return stringItemsSketchHashMap;
+ }
+
+ @Override
+ public HashMap getResult(HashMap stringItemsSketchHashMap) {
+
+
+ return stringItemsSketchHashMap;
+ }
+
+ @Override
+ public HashMap merge(HashMap stringItemsSketchHashMap, HashMap acc1) {
+
+
+
+ return null;
+ }
+}
diff --git a/src/main/java/com/galaxy/tsg/function/DatasketchForUrlAggregate.java b/src/main/java/com/galaxy/tsg/function/DatasketchForUrlAggregate.java
new file mode 100644
index 0000000..03b04f3
--- /dev/null
+++ b/src/main/java/com/galaxy/tsg/function/DatasketchForUrlAggregate.java
@@ -0,0 +1,107 @@
+package com.galaxy.tsg.function;
+
+
+import com.galaxy.tsg.pojo.TopUrlEntity;
+import com.galaxy.tsg.pojo.UrlEntity;
+import org.apache.datasketches.frequencies.ItemsSketch;
+import org.apache.flink.api.common.functions.AggregateFunction;
+
+import java.util.HashMap;
+
+/**
+ * @author fy
+ * @version 1.0
+ * @date 2023/2/14 17:29
+ *
+ *Session_record top10000 21个窗口计算
+ */
+public class DatasketchForUrlAggregate implements AggregateFunction, HashMap> {
+
+
+ @Override
+ public HashMap createAccumulator() {
+ return new HashMap(1048576);
+ }
+
+ @Override
+ public HashMap add(UrlEntity cnRecordLog, HashMap stringItemsSketchHashMap) {
+
+// String dimension = cnRecordLog.getCommon_client_ip();//维度
+// System.out.println(dimension);
+
+ if(stringItemsSketchHashMap.isEmpty()) {
+
+ ItemsSketch eightSessionItemsSketch = new ItemsSketch<>(1048576);//新建
+
+ eightSessionItemsSketch.update(Dimension.setEightDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
+
+
+ TopUrlEntity topUrlEntity = new TopUrlEntity();
+ topUrlEntity.setSession_num(cnRecordLog.getCommon_sessions());
+ topUrlEntity.setStat_time(System.currentTimeMillis() / 1000);
+ topUrlEntity.setUrl(cnRecordLog.getHttp_url());
+ topUrlEntity.setVsys_id(cnRecordLog.getCommon_vsys_id());
+ HashMap stringTopUrlEntityHashMap = new HashMap<>();
+ stringTopUrlEntityHashMap.put(Dimension.setEightDimension(cnRecordLog),topUrlEntity);
+
+
+ DimensionItemsSketch eightSessionDimensionItemsSketch = new DimensionItemsSketch(stringTopUrlEntityHashMap,Dimension.EIGHT,eightSessionItemsSketch);
+
+ stringItemsSketchHashMap.put("eightSession", eightSessionDimensionItemsSketch);
+
+
+ }else {
+
+
+ stringItemsSketchHashMap.get("eightSession").getItemsSketch().update(Dimension.setEightDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
+
+ TopUrlEntity eightSession = stringItemsSketchHashMap.get("eightSession").getStringTopUrlEntityHashMap().get(Dimension.setEightDimension(cnRecordLog));//从key获取集合
+
+ if (eightSession==null){
+
+ TopUrlEntity topUrlEntity = new TopUrlEntity();
+ topUrlEntity.setSession_num(cnRecordLog.getCommon_sessions());
+ topUrlEntity.setStat_time(System.currentTimeMillis() / 1000);
+ topUrlEntity.setUrl(cnRecordLog.getHttp_url());
+ topUrlEntity.setVsys_id(cnRecordLog.getCommon_vsys_id());
+
+ stringItemsSketchHashMap.get("eightSession").getStringTopUrlEntityHashMap().put(Dimension.setEightDimension(cnRecordLog),eightSession);
+
+ }else {//做加和
+
+
+ eightSession.setSession_num(eightSession.getSession_num()+cnRecordLog.getCommon_sessions());
+
+
+ stringItemsSketchHashMap.get("eightSession").getStringTopUrlEntityHashMap().put(Dimension.setEightDimension(cnRecordLog),eightSession);
+
+
+ }
+
+
+
+
+
+
+ }
+
+
+
+ return stringItemsSketchHashMap;
+ }
+
+ @Override
+ public HashMap getResult(HashMap stringItemsSketchHashMap) {
+
+
+ return stringItemsSketchHashMap;
+ }
+
+ @Override
+ public HashMap merge(HashMap stringItemsSketchHashMap, HashMap acc1) {
+
+
+
+ return null;
+ }
+}
diff --git a/src/main/java/com/galaxy/tsg/function/DimensionItemsSketch.java b/src/main/java/com/galaxy/tsg/function/DimensionItemsSketch.java
index 457cbc2..ab4efeb 100644
--- a/src/main/java/com/galaxy/tsg/function/DimensionItemsSketch.java
+++ b/src/main/java/com/galaxy/tsg/function/DimensionItemsSketch.java
@@ -1,8 +1,11 @@
package com.galaxy.tsg.function;
+import com.galaxy.tsg.pojo.TopUrlEntity;
import org.apache.datasketches.frequencies.ItemsSketch;
+import java.util.HashMap;
+
/**
* @author fy
* @version 1.0
@@ -14,6 +17,11 @@ public class DimensionItemsSketch {
private String dimension;//维度
private ItemsSketch itemsSketch;//对应的
+ private HashMap stringCollectionValueHashMap;//同指标集合成的值
+
+ private HashMap stringTopUrlEntityHashMap;//同指标集合成的值
+
+
public DimensionItemsSketch(String dimension, ItemsSketch itemsSketch) {
this.dimension = dimension;
@@ -21,6 +29,19 @@ public class DimensionItemsSketch {
}
+ public DimensionItemsSketch(String dimension, ItemsSketch itemsSketch, HashMap stringCollectionValueHashMap) {
+ this.dimension = dimension;
+ this.itemsSketch = itemsSketch;
+ this.stringCollectionValueHashMap = stringCollectionValueHashMap;
+ }
+
+
+ public DimensionItemsSketch(HashMap stringTopUrlEntityHashMap, String dimension, ItemsSketch itemsSketch) {
+ this.dimension = dimension;
+ this.itemsSketch = itemsSketch;
+ this.stringTopUrlEntityHashMap = stringTopUrlEntityHashMap;
+ }
+
public String getDimension() {
return dimension;
}
@@ -36,6 +57,24 @@ public class DimensionItemsSketch {
public void setItemsSketch(ItemsSketch itemsSketch) {
this.itemsSketch = itemsSketch;
}
+
+
+ public HashMap getStringCollectionValueHashMap() {
+ return stringCollectionValueHashMap;
+ }
+
+ public void setStringCollectionValueHashMap(HashMap stringCollectionValueHashMap) {
+ this.stringCollectionValueHashMap = stringCollectionValueHashMap;
+ }
+
+
+ public HashMap getStringTopUrlEntityHashMap() {
+ return stringTopUrlEntityHashMap;
+ }
+
+ public void setStringTopUrlEntityHashMap(HashMap stringTopUrlEntityHashMap) {
+ this.stringTopUrlEntityHashMap = stringTopUrlEntityHashMap;
+ }
}
diff --git a/src/main/java/com/galaxy/tsg/function/UserCountWindowResult5.java b/src/main/java/com/galaxy/tsg/function/UserCountWindowResult5.java
index f7f9207..c98545b 100644
--- a/src/main/java/com/galaxy/tsg/function/UserCountWindowResult5.java
+++ b/src/main/java/com/galaxy/tsg/function/UserCountWindowResult5.java
@@ -1,5 +1,7 @@
package com.galaxy.tsg.function;
+import com.alibaba.fastjson.JSONObject;
+import com.galaxy.tsg.pojo.TopUrlEntity;
import org.apache.datasketches.frequencies.ErrorType;
import org.apache.datasketches.frequencies.ItemsSketch;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
@@ -50,8 +52,18 @@ public class UserCountWindowResult5 extends ProcessAllWindowFunction, HashMap> {
-
-
- @Override
- public HashMap createAccumulator() {
- return new HashMap(1048576);
- }
-
- @Override
- public HashMap add(Entity cnRecordLog, HashMap stringItemsSketchHashMap) {
-
-// String dimension = cnRecordLog.getCommon_client_ip();//维度
-// System.out.println(dimension);
-
- if(stringItemsSketchHashMap.isEmpty()) {
-
- ItemsSketch oneSessionItemsSketch = new ItemsSketch<>(1048576);//新建
- ItemsSketch twoSessionItemsSketch = new ItemsSketch<>(1048576);//新建
- ItemsSketch threeSessionItemsSketch = new ItemsSketch<>(1048576);//新建
- ItemsSketch fourSessionItemsSketch = new ItemsSketch<>(1048576);//新建
- ItemsSketch fiveSessionItemsSketch = new ItemsSketch<>(1048576);//新建
- ItemsSketch sixSessionItemsSketch = new ItemsSketch<>(1048576);//新建
- ItemsSketch sevenSessionItemsSketch = new ItemsSketch<>(1048576);//新建
-
- ItemsSketch onePktItemsSketch = new ItemsSketch<>(1048576);
- ItemsSketch twoPktItemsSketch = new ItemsSketch<>(1048576);
- ItemsSketch threePktItemsSketch = new ItemsSketch<>(1048576);
- ItemsSketch fourPktItemsSketch = new ItemsSketch<>(1048576);
- ItemsSketch fivePktItemsSketch = new ItemsSketch<>(1048576);
- ItemsSketch sixPktItemsSketch = new ItemsSketch<>(1048576);
- ItemsSketch sevenPktItemsSketch = new ItemsSketch<>(1048576);
-
-
-
- ItemsSketch oneByteItemsSketch = new ItemsSketch<>(1048576);
- ItemsSketch twoByteItemsSketch = new ItemsSketch<>(1048576);
- ItemsSketch threeByteItemsSketch = new ItemsSketch<>(1048576);
- ItemsSketch fourByteItemsSketch = new ItemsSketch<>(1048576);
- ItemsSketch fiveByteItemsSketch = new ItemsSketch<>(1048576);
- ItemsSketch sixByteItemsSketch = new ItemsSketch<>(1048576);
- ItemsSketch sevenByteItemsSketch = new ItemsSketch<>(1048576);
-
-
- oneSessionItemsSketch.update(Dimension.setOneDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
- twoSessionItemsSketch.update(Dimension.setTwoDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
- threeSessionItemsSketch.update(Dimension.setThreeDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
- fourSessionItemsSketch.update(Dimension.setFourDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
- fiveSessionItemsSketch.update(Dimension.setFiveDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
- sixSessionItemsSketch.update(Dimension.setSixDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
- sevenSessionItemsSketch.update(Dimension.setSevenDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
-
- onePktItemsSketch.update(Dimension.setOneDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num());
- twoPktItemsSketch.update(Dimension.setTwoDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num());
- threePktItemsSketch.update(Dimension.setThreeDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num());
- fourPktItemsSketch.update(Dimension.setFourDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num());
- fivePktItemsSketch.update(Dimension.setFiveDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num());
- sixPktItemsSketch.update(Dimension.setSixDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num());
- sevenPktItemsSketch.update(Dimension.setSevenDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num());
-
-
- oneByteItemsSketch.update(Dimension.setOneDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num());
- twoByteItemsSketch.update(Dimension.setTwoDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num());
- threeByteItemsSketch.update(Dimension.setThreeDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num());
- fourByteItemsSketch.update(Dimension.setFourDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num());
- fiveByteItemsSketch.update(Dimension.setFiveDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num());
- sixByteItemsSketch.update(Dimension.setSixDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num());
- sevenByteItemsSketch.update(Dimension.setSevenDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num());
-
-
-
- DimensionItemsSketch oneSessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.ONE, oneSessionItemsSketch);
- DimensionItemsSketch twoSessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.TWO,twoSessionItemsSketch);
- DimensionItemsSketch threeSessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.THREE,threeSessionItemsSketch);
- DimensionItemsSketch fourSessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.FOUR,fourSessionItemsSketch);
- DimensionItemsSketch fiveSessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.FIVE,fiveSessionItemsSketch);
- DimensionItemsSketch sixSessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.SIX,sixSessionItemsSketch);
- DimensionItemsSketch sevenSessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.SEVEN,sevenSessionItemsSketch);
-
- DimensionItemsSketch onePktDimensionItemsSketch = new DimensionItemsSketch(Dimension.ONE,onePktItemsSketch);
- DimensionItemsSketch twoPktDimensionItemsSketch = new DimensionItemsSketch(Dimension.TWO,twoPktItemsSketch);
- DimensionItemsSketch threePktDimensionItemsSketch = new DimensionItemsSketch(Dimension.THREE,threePktItemsSketch);
- DimensionItemsSketch fourPktDimensionItemsSketch = new DimensionItemsSketch(Dimension.FOUR,fourPktItemsSketch);
- DimensionItemsSketch fivePktDimensionItemsSketch = new DimensionItemsSketch(Dimension.FIVE,fivePktItemsSketch);
- DimensionItemsSketch sixPktDimensionItemsSketch = new DimensionItemsSketch(Dimension.SIX,sixPktItemsSketch);
- DimensionItemsSketch sevenPktDimensionItemsSketch = new DimensionItemsSketch(Dimension.SEVEN,sevenPktItemsSketch);
-
- DimensionItemsSketch oneByteDimensionItemsSketch = new DimensionItemsSketch(Dimension.ONE,oneByteItemsSketch);
- DimensionItemsSketch twoByteDimensionItemsSketch = new DimensionItemsSketch(Dimension.TWO,twoByteItemsSketch);
- DimensionItemsSketch threeByteDimensionItemsSketch = new DimensionItemsSketch(Dimension.THREE,threeByteItemsSketch);
- DimensionItemsSketch fourByteDimensionItemsSketch = new DimensionItemsSketch(Dimension.FOUR,fourByteItemsSketch);
- DimensionItemsSketch fiveByteDimensionItemsSketch = new DimensionItemsSketch(Dimension.FIVE,fiveByteItemsSketch);
- DimensionItemsSketch sixByteDimensionItemsSketch = new DimensionItemsSketch(Dimension.SIX,sixByteItemsSketch);
- DimensionItemsSketch sevenByteDimensionItemsSketch = new DimensionItemsSketch(Dimension.SEVEN,sevenByteItemsSketch);
-
-
- stringItemsSketchHashMap.put("oneSession", oneSessionDimensionItemsSketch);
- stringItemsSketchHashMap.put("twoSession", twoSessionDimensionItemsSketch);
- stringItemsSketchHashMap.put("threeSession", threeSessionDimensionItemsSketch);
- stringItemsSketchHashMap.put("fourSession", fourSessionDimensionItemsSketch);
- stringItemsSketchHashMap.put("fiveSession", fiveSessionDimensionItemsSketch);
- stringItemsSketchHashMap.put("sixSession", sixSessionDimensionItemsSketch);
- stringItemsSketchHashMap.put("sevenSession", sevenSessionDimensionItemsSketch);
-
-
- stringItemsSketchHashMap.put("onePkt",onePktDimensionItemsSketch);
- stringItemsSketchHashMap.put("twoPkt",twoPktDimensionItemsSketch);
- stringItemsSketchHashMap.put("threePkt",threePktDimensionItemsSketch);
- stringItemsSketchHashMap.put("fourPkt",fourPktDimensionItemsSketch);
- stringItemsSketchHashMap.put("fivePkt",fivePktDimensionItemsSketch);
- stringItemsSketchHashMap.put("sixPkt",sixPktDimensionItemsSketch);
- stringItemsSketchHashMap.put("sevenPkt",sevenPktDimensionItemsSketch);
-
-
- stringItemsSketchHashMap.put("oneByte",oneByteDimensionItemsSketch);
- stringItemsSketchHashMap.put("twoByte",twoByteDimensionItemsSketch);
- stringItemsSketchHashMap.put("threeByte",threeByteDimensionItemsSketch);
- stringItemsSketchHashMap.put("fourByte",fourByteDimensionItemsSketch);
- stringItemsSketchHashMap.put("fiveByte",fiveByteDimensionItemsSketch);
- stringItemsSketchHashMap.put("sixByte",sixByteDimensionItemsSketch);
- stringItemsSketchHashMap.put("sevenByte",sevenByteDimensionItemsSketch);
-
-
- }else {
-
- stringItemsSketchHashMap.get("oneSession").getItemsSketch().update(Dimension.setOneDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
- stringItemsSketchHashMap.get("twoSession").getItemsSketch().update(Dimension.setTwoDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
- stringItemsSketchHashMap.get("threeSession").getItemsSketch().update(Dimension.setThreeDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
- stringItemsSketchHashMap.get("fourSession").getItemsSketch().update(Dimension.setFourDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
- stringItemsSketchHashMap.get("fiveSession").getItemsSketch().update(Dimension.setFiveDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
- stringItemsSketchHashMap.get("sixSession").getItemsSketch().update(Dimension.setSixDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
- stringItemsSketchHashMap.get("sevenSession").getItemsSketch().update(Dimension.setSevenDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
-
- stringItemsSketchHashMap.get("onePkt").getItemsSketch().update(Dimension.setOneDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num());
- stringItemsSketchHashMap.get("twoPkt").getItemsSketch().update(Dimension.setTwoDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num());
- stringItemsSketchHashMap.get("threePkt").getItemsSketch().update(Dimension.setThreeDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num());
- stringItemsSketchHashMap.get("fourPkt").getItemsSketch().update(Dimension.setFourDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num());
- stringItemsSketchHashMap.get("fivePkt").getItemsSketch().update(Dimension.setFiveDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num());
- stringItemsSketchHashMap.get("sixPkt").getItemsSketch().update(Dimension.setSixDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num());
- stringItemsSketchHashMap.get("sevenPkt").getItemsSketch().update(Dimension.setSevenDimension(cnRecordLog),cnRecordLog.getCommon_c2s_pkt_num()+cnRecordLog.getCommon_s2c_pkt_num());
-
-
- stringItemsSketchHashMap.get("oneByte").getItemsSketch().update(Dimension.setOneDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num());
- stringItemsSketchHashMap.get("twoByte").getItemsSketch().update(Dimension.setTwoDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num());
- stringItemsSketchHashMap.get("threeByte").getItemsSketch().update(Dimension.setThreeDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num());
- stringItemsSketchHashMap.get("fourByte").getItemsSketch().update(Dimension.setFourDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num());
- stringItemsSketchHashMap.get("fiveByte").getItemsSketch().update(Dimension.setFiveDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num());
- stringItemsSketchHashMap.get("sixByte").getItemsSketch().update(Dimension.setSixDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num());
- stringItemsSketchHashMap.get("sevenByte").getItemsSketch().update(Dimension.setSevenDimension(cnRecordLog),cnRecordLog.getCommon_c2s_byte_num()+cnRecordLog.getCommon_s2c_byte_num());
-
-
-
-
-
- }
-
-
-
- return stringItemsSketchHashMap;
- }
-
- @Override
- public HashMap getResult(HashMap stringItemsSketchHashMap) {
-
-
- return stringItemsSketchHashMap;
- }
-
- @Override
- public HashMap merge(HashMap stringItemsSketchHashMap, HashMap acc1) {
-
-// System.out.println("合并");
-// HashMap> unionSketchHashMap = new HashMap<>();
-// ItemsSketch sessionItemsSketch = new ItemsSketch<>(1048576);
-// ItemsSketch pktItemsSketch = new ItemsSketch<>(1048576);
-// ItemsSketch byteItemsSketch = new ItemsSketch<>(1048576);
-
-// ItemsSketch session_stringItemsSketch = stringItemsSketchHashMap.get("session");
-// ItemsSketch pkt_stringItemsSketch = stringItemsSketchHashMap.get("pkt");
-// ItemsSketch byte_stringItemsSketch = stringItemsSketchHashMap.get("byte");
-
-// ItemsSketch session_acc1 = acc1.get("session");
-// ItemsSketch pkt_acc1 = acc1.get("pkt");
-// ItemsSketch byte_acc1 = acc1.get("byte");
-
-// sessionItemsSketch.merge(session_stringItemsSketch);
-// sessionItemsSketch.merge(session_acc1);
-
-// pktItemsSketch.merge(pkt_stringItemsSketch);
-// pktItemsSketch.merge(pkt_acc1);
-//
-// byteItemsSketch.merge(byte_stringItemsSketch);
-// byteItemsSketch.merge(byte_acc1);
-
-// unionSketchHashMap.put("session",sessionItemsSketch);
-// unionSketchHashMap.put("pkt",pktItemsSketch);
-// unionSketchHashMap.put("byte",byteItemsSketch);
-
-
- return null;
- }
-}
diff --git a/src/main/java/com/galaxy/tsg/function/UserHashMapCountAgg6.java b/src/main/java/com/galaxy/tsg/function/UserHashMapCountAgg6.java
deleted file mode 100644
index 2005e1f..0000000
--- a/src/main/java/com/galaxy/tsg/function/UserHashMapCountAgg6.java
+++ /dev/null
@@ -1,99 +0,0 @@
-package com.galaxy.tsg.function;
-
-
-import com.galaxy.tsg.pojo.Entity;
-import com.galaxy.tsg.pojo.UrlEntity;
-import org.apache.datasketches.frequencies.ItemsSketch;
-import org.apache.flink.api.common.functions.AggregateFunction;
-
-import java.util.HashMap;
-
-/**
- * @author fy
- * @version 1.0
- * @date 2023/2/14 17:29
- *
- *Session_record top10000 21个窗口计算
- */
-public class UserHashMapCountAgg6 implements AggregateFunction, HashMap> {
-
-
- @Override
- public HashMap createAccumulator() {
- return new HashMap(1048576);
- }
-
- @Override
- public HashMap add(UrlEntity cnRecordLog, HashMap stringItemsSketchHashMap) {
-
-// String dimension = cnRecordLog.getCommon_client_ip();//维度
-// System.out.println(dimension);
-
- if(stringItemsSketchHashMap.isEmpty()) {
-
- ItemsSketch eightSessionItemsSketch = new ItemsSketch<>(1048576);//新建
-
- eightSessionItemsSketch.update(Dimension.setEightDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
-
- DimensionItemsSketch eightSessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.EIGHT,eightSessionItemsSketch);
-
- stringItemsSketchHashMap.put("eightSession", eightSessionDimensionItemsSketch);
-
-
- }else {
-
-
-
- stringItemsSketchHashMap.get("eightSession").getItemsSketch().update(Dimension.setEightDimension(cnRecordLog),cnRecordLog.getCommon_sessions());
-
-
-
-
- }
-
-
-
- return stringItemsSketchHashMap;
- }
-
- @Override
- public HashMap getResult(HashMap stringItemsSketchHashMap) {
-
-
- return stringItemsSketchHashMap;
- }
-
- @Override
- public HashMap merge(HashMap stringItemsSketchHashMap, HashMap acc1) {
-
-// System.out.println("合并");
-// HashMap> unionSketchHashMap = new HashMap<>();
-// ItemsSketch sessionItemsSketch = new ItemsSketch<>(1048576);
-// ItemsSketch pktItemsSketch = new ItemsSketch<>(1048576);
-// ItemsSketch byteItemsSketch = new ItemsSketch<>(1048576);
-
-// ItemsSketch session_stringItemsSketch = stringItemsSketchHashMap.get("session");
-// ItemsSketch pkt_stringItemsSketch = stringItemsSketchHashMap.get("pkt");
-// ItemsSketch byte_stringItemsSketch = stringItemsSketchHashMap.get("byte");
-
-// ItemsSketch session_acc1 = acc1.get("session");
-// ItemsSketch pkt_acc1 = acc1.get("pkt");
-// ItemsSketch byte_acc1 = acc1.get("byte");
-
-// sessionItemsSketch.merge(session_stringItemsSketch);
-// sessionItemsSketch.merge(session_acc1);
-
-// pktItemsSketch.merge(pkt_stringItemsSketch);
-// pktItemsSketch.merge(pkt_acc1);
-//
-// byteItemsSketch.merge(byte_stringItemsSketch);
-// byteItemsSketch.merge(byte_acc1);
-
-// unionSketchHashMap.put("session",sessionItemsSketch);
-// unionSketchHashMap.put("pkt",pktItemsSketch);
-// unionSketchHashMap.put("byte",byteItemsSketch);
-
-
- return null;
- }
-}