diff --git a/src/main/java/com/galaxy/tsg/Toptask.java b/src/main/java/com/galaxy/tsg/Toptask.java index 53065e2..7329789 100644 --- a/src/main/java/com/galaxy/tsg/Toptask.java +++ b/src/main/java/com/galaxy/tsg/Toptask.java @@ -228,8 +228,8 @@ public class Toptask { case 2: //datasketch - //clientip聚合TOP + //clientip聚合TOP SingleOutputStreamOperator clientipdStream2 = inputForSession.filter(new FilterFunction() { @Override public boolean filter(Entity value) throws Exception { @@ -237,12 +237,16 @@ public class Toptask { } }).assignTimestampsAndWatermarks(strategyForSession); - SingleOutputStreamOperator windowedStream2 = clientipdStream2.keyBy(new groupBySelector("common_client_ip")) + + + clientipdStream2.keyBy(new groupBySelector("common_client_ip")) .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) - .reduce(new metricsAggregationReduce(), new DatasketchMetricsCalculate(TOP_LIMIT, "common_client_ip")).setParallelism(TASK_PARALLELISM); - DataStream windoweddStream2 = windowedStream2.keyBy(new oneKeySelector()) - .process(new TopNHotItems(TOP_LIMIT)).setParallelism(3); - windoweddStream2.addSink(getKafkaSink("TOP-CLIENT-IP")).setParallelism(3); + .aggregate(new DatasketchForMetricsAggregate2("common_client_ip"), new UserCountWindowResult6()) +// .setParallelism(TASK_PARALLELISM) +// .print(); + .addSink(getKafkaSink("TOP-CLIENT-IP")).setParallelism(3); + + //serverip聚合TOP @@ -253,12 +257,14 @@ public class Toptask { } }).assignTimestampsAndWatermarks(strategyForSession); - SingleOutputStreamOperator windowedStreamForServerIp2 = serveripdStream2.keyBy(new groupBySelector("common_server_ip")) + + serveripdStream2.keyBy(new groupBySelector("common_server_ip")) .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) - .reduce(new metricsAggregationReduce(), new DatasketchMetricsCalculate(TOP_LIMIT, "common_server_ip")).setParallelism(TASK_PARALLELISM); - DataStream windoweddStreamForServerIp2 = windowedStreamForServerIp2.keyBy(new oneKeySelector()) - .process(new TopNHotItems(TOP_LIMIT)).setParallelism(3); - windoweddStreamForServerIp2.addSink(getKafkaSink("TOP-SERVER-IP")).setParallelism(3); + .aggregate(new DatasketchForMetricsAggregate2("common_server_ip"), new UserCountWindowResult6()) +// .print(); + .addSink(getKafkaSink("TOP-SERVER-IP")).setParallelism(3); + + //common_internal_ip聚合TOP @@ -269,13 +275,16 @@ public class Toptask { } }).assignTimestampsAndWatermarks(strategyForSession); - SingleOutputStreamOperator windowedStreamForInternal2 = internalStream2.keyBy(new groupBySelector("common_internal_ip")) - .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) - .reduce(new metricsAggregationReduce(), new DatasketchMetricsCalculate(TOP_LIMIT, "common_internal_ip")).setParallelism(TASK_PARALLELISM); - DataStream WindoweddStreamForInternal2 = windowedStreamForInternal2.keyBy(new oneKeySelector()) - .process(new TopNHotItems(TOP_LIMIT)).setParallelism(3); - WindoweddStreamForInternal2.addSink(getKafkaSink("TOP-INTERNAL-HOST")).setParallelism(3); + + internalStream2.keyBy(new groupBySelector("common_internal_ip")) + .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) + .aggregate(new DatasketchForMetricsAggregate2("common_internal_ip"), new UserCountWindowResult6()) +// .print(); + .addSink(getKafkaSink("TOP-INTERNAL-HOST")).setParallelism(3); +// +// +// //common_external_ip聚合TOP SingleOutputStreamOperator externalStream2 = inputForSession.filter(new FilterFunction() { @@ -285,12 +294,14 @@ public class Toptask { } }).assignTimestampsAndWatermarks(strategyForSession); - SingleOutputStreamOperator windowedStreamForExternal2 = externalStream2.keyBy(new groupBySelector("common_external_ip")) + + externalStream2.keyBy(new groupBySelector("common_external_ip")) .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) - .reduce(new metricsAggregationReduce(), new DatasketchMetricsCalculate(TOP_LIMIT, "common_external_ip")).setParallelism(TASK_PARALLELISM); - DataStream WindoweddStreamForExternal2 = windowedStreamForExternal2.keyBy(new oneKeySelector()) - .process(new TopNHotItems(TOP_LIMIT)).setParallelism(3); - WindoweddStreamForExternal2.addSink(getKafkaSink("TOP-EXTERNAL-HOST")).setParallelism(3); + .aggregate(new DatasketchForMetricsAggregate2("common_external_ip"), new UserCountWindowResult6()) +// .print(); + .addSink(getKafkaSink("TOP-EXTERNAL-HOST")).setParallelism(3); + + //http_domain聚合TOP @@ -301,13 +312,17 @@ public class Toptask { } }).assignTimestampsAndWatermarks(strategyForSession); - SingleOutputStreamOperator windowedStreamForDomain2 = domainStream2.keyBy(new groupBySelector("http_domain")) - .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) - .reduce(new metricsAggregationReduce(), new DatasketchMetricsCalculate(TOP_LIMIT, "http_domain")).setParallelism(TASK_PARALLELISM); - DataStream WindoweddStreamForDomain2 = windowedStreamForDomain2.keyBy(new oneKeySelector()) - .process(new TopNHotItems(TOP_LIMIT)).setParallelism(3); - WindoweddStreamForDomain2.addSink(getKafkaSink("TOP-WEBSITE-DOMAIN")).setParallelism(3); + + domainStream2.keyBy(new groupBySelector("http_domain")) + .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) + .aggregate(new DatasketchForMetricsAggregate2("http_domain"), new UserCountWindowResult6()) +// .print(); + .addSink(getKafkaSink("TOP-WEBSITE-DOMAIN")).setParallelism(3); +// +// +// + //common_subscriber_id聚合TOP SingleOutputStreamOperator userStream2 = inputForSession.filter(new FilterFunction() { @Override public boolean filter(Entity value) throws Exception { @@ -315,13 +330,15 @@ public class Toptask { } }).assignTimestampsAndWatermarks(strategyForSession); - //common_subscriber_id聚合TOP - SingleOutputStreamOperator windowedStreamForUser2 = userStream2.keyBy(new groupBySelector("common_subscriber_id")) + + userStream2.keyBy(new groupBySelector("common_subscriber_id")) .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) - .reduce(new metricsAggregationReduce(), new DatasketchMetricsCalculate(TOP_LIMIT, "common_subscriber_id")).setParallelism(TASK_PARALLELISM); - DataStream WindoweddStreamForUser2 = windowedStreamForUser2.keyBy(new oneKeySelector()) - .process(new TopNHotItems(TOP_LIMIT)).setParallelism(3); - WindoweddStreamForUser2.addSink(getKafkaSink("TOP-USER")).setParallelism(3); + .aggregate(new DatasketchForMetricsAggregate2("common_subscriber_id"), new UserCountWindowResult6()) +// .print(); + .addSink(getKafkaSink("TOP-USER")).setParallelism(3); +// + + //common_app_label聚合求全量 @@ -336,7 +353,9 @@ public class Toptask { appNameStream2.keyBy(new groupBySelector("common_app_label")) .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) - .reduce(new metricsAggregationReduce(), new metricsCalculateForApp()).addSink(getKafkaSink("TRAFFIC-APP-STAT")).setParallelism(TASK_PARALLELISM); + .reduce(new metricsAggregationReduce(), new metricsCalculateForApp()) + .addSink(getKafkaSink("TRAFFIC-APP-STAT")) + .setParallelism(TASK_PARALLELISM); @@ -349,12 +368,14 @@ public class Toptask { }).assignTimestampsAndWatermarks(strategyForSecurity); - SingleOutputStreamOperator windowedStreamForUrl2 = UrlStream2.keyBy(new twoKeySelector()) + + UrlStream2.keyBy(new twoKeySelector()) .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) - .reduce(new UrlAggregationReduce(), new DatasketchUrlCalculate(URL_TOP_LIMIT)).setParallelism(TASK_PARALLELISM); - DataStream WindoweddStreamForUrl2 = windowedStreamForUrl2.keyBy(new oneKeySelector()) - .process(new TopNHotItemsForUrl(URL_TOP_LIMIT)).setParallelism(1); - WindoweddStreamForUrl2.addSink(getKafkaSink("TOP-URLS")).setParallelism(3); + .aggregate(new DatasketchForUrlAggregate2(), new UserCountWindowResult7()) +// .print(); + .addSink(getKafkaSink("TOP-URLS")).setParallelism(3); + + @@ -363,6 +384,168 @@ public class Toptask { break; + + + + +// clientipdStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) +// .aggregate(new DatasketchForMetricsAggregate("clientIpSession"), new UserCountWindowResult5()) +//// .print(); +// .addSink(getKafkaSink("TOP-CLIENT-IP")).setParallelism(3); + +// clientipdStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) +// .aggregate(new DatasketchForMetricsAggregate("clientIpPkt"), new UserCountWindowResult5()) +//// .print(); +// .addSink(getKafkaSink("TOP-CLIENT-IP")).setParallelism(3); +// +// +// clientipdStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) +// .aggregate(new DatasketchForMetricsAggregate("clientIpByte"), new UserCountWindowResult5()) +//// .print(); +// .addSink(getKafkaSink("TOP-CLIENT-IP")).setParallelism(3); + + + + + + + +// +// //clientip聚合TOP +// +// SingleOutputStreamOperator clientipdStream2 = inputForSession.filter(new FilterFunction() { +// @Override +// public boolean filter(Entity value) throws Exception { +// return "IPv6_TCP".equals(value.getCommon_l4_protocol()) || "IPv4_TCP".equals(value.getCommon_l4_protocol()); +// } +// }).assignTimestampsAndWatermarks(strategyForSession); +// +// SingleOutputStreamOperator windowedStream2 = clientipdStream2.keyBy(new groupBySelector("common_client_ip")) +// .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) +// .reduce(new metricsAggregationReduce(), new DatasketchMetricsCalculate(TOP_LIMIT, "common_client_ip")).setParallelism(TASK_PARALLELISM); +// DataStream windoweddStream2 = windowedStream2.keyBy(new oneKeySelector()) +// .process(new TopNHotItems(TOP_LIMIT)).setParallelism(3); +// windoweddStream2.addSink(getKafkaSink("TOP-CLIENT-IP")).setParallelism(3); +// +// //serverip聚合TOP +// +// SingleOutputStreamOperator serveripdStream2 = inputForSession.filter(new FilterFunction() { +// @Override +// public boolean filter(Entity value) throws Exception { +// return "IPv6_TCP".equals(value.getCommon_l4_protocol()) || "IPv4_TCP".equals(value.getCommon_l4_protocol()); +// } +// }).assignTimestampsAndWatermarks(strategyForSession); +// +// SingleOutputStreamOperator windowedStreamForServerIp2 = serveripdStream2.keyBy(new groupBySelector("common_server_ip")) +// .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) +// .reduce(new metricsAggregationReduce(), new DatasketchMetricsCalculate(TOP_LIMIT, "common_server_ip")).setParallelism(TASK_PARALLELISM); +// DataStream windoweddStreamForServerIp2 = windowedStreamForServerIp2.keyBy(new oneKeySelector()) +// .process(new TopNHotItems(TOP_LIMIT)).setParallelism(3); +// windoweddStreamForServerIp2.addSink(getKafkaSink("TOP-SERVER-IP")).setParallelism(3); +// +// +// //common_internal_ip聚合TOP +// SingleOutputStreamOperator internalStream2 = inputForSession.filter(new FilterFunction() { +// @Override +// public boolean filter(Entity value) throws Exception { +// return StringUtil.isNotEmpty(value.getCommon_internal_ip()); +// } +// }).assignTimestampsAndWatermarks(strategyForSession); +// +// SingleOutputStreamOperator windowedStreamForInternal2 = internalStream2.keyBy(new groupBySelector("common_internal_ip")) +// .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) +// .reduce(new metricsAggregationReduce(), new DatasketchMetricsCalculate(TOP_LIMIT, "common_internal_ip")).setParallelism(TASK_PARALLELISM); +// DataStream WindoweddStreamForInternal2 = windowedStreamForInternal2.keyBy(new oneKeySelector()) +// .process(new TopNHotItems(TOP_LIMIT)).setParallelism(3); +// WindoweddStreamForInternal2.addSink(getKafkaSink("TOP-INTERNAL-HOST")).setParallelism(3); +// +// //common_external_ip聚合TOP +// +// SingleOutputStreamOperator externalStream2 = inputForSession.filter(new FilterFunction() { +// @Override +// public boolean filter(Entity value) throws Exception { +// return StringUtil.isNotEmpty(value.getCommon_external_ip()); +// } +// }).assignTimestampsAndWatermarks(strategyForSession); +// +// SingleOutputStreamOperator windowedStreamForExternal2 = externalStream2.keyBy(new groupBySelector("common_external_ip")) +// .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) +// .reduce(new metricsAggregationReduce(), new DatasketchMetricsCalculate(TOP_LIMIT, "common_external_ip")).setParallelism(TASK_PARALLELISM); +// DataStream WindoweddStreamForExternal2 = windowedStreamForExternal2.keyBy(new oneKeySelector()) +// .process(new TopNHotItems(TOP_LIMIT)).setParallelism(3); +// WindoweddStreamForExternal2.addSink(getKafkaSink("TOP-EXTERNAL-HOST")).setParallelism(3); +// +// //http_domain聚合TOP +// +// SingleOutputStreamOperator domainStream2 = inputForSession.filter(new FilterFunction() { +// @Override +// public boolean filter(Entity value) throws Exception { +// return StringUtil.isNotEmpty(value.getHttp_domain()); +// } +// }).assignTimestampsAndWatermarks(strategyForSession); +// +// SingleOutputStreamOperator windowedStreamForDomain2 = domainStream2.keyBy(new groupBySelector("http_domain")) +// .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) +// .reduce(new metricsAggregationReduce(), new DatasketchMetricsCalculate(TOP_LIMIT, "http_domain")).setParallelism(TASK_PARALLELISM); +// DataStream WindoweddStreamForDomain2 = windowedStreamForDomain2.keyBy(new oneKeySelector()) +// .process(new TopNHotItems(TOP_LIMIT)).setParallelism(3); +// WindoweddStreamForDomain2.addSink(getKafkaSink("TOP-WEBSITE-DOMAIN")).setParallelism(3); +// +// SingleOutputStreamOperator userStream2 = inputForSession.filter(new FilterFunction() { +// @Override +// public boolean filter(Entity value) throws Exception { +// return StringUtil.isNotEmpty(value.getCommon_subscriber_id()); +// } +// }).assignTimestampsAndWatermarks(strategyForSession); +// +// //common_subscriber_id聚合TOP +// SingleOutputStreamOperator windowedStreamForUser2 = userStream2.keyBy(new groupBySelector("common_subscriber_id")) +// .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) +// .reduce(new metricsAggregationReduce(), new DatasketchMetricsCalculate(TOP_LIMIT, "common_subscriber_id")).setParallelism(TASK_PARALLELISM); +// DataStream WindoweddStreamForUser2 = windowedStreamForUser2.keyBy(new oneKeySelector()) +// .process(new TopNHotItems(TOP_LIMIT)).setParallelism(3); +// WindoweddStreamForUser2.addSink(getKafkaSink("TOP-USER")).setParallelism(3); +// +// +// //common_app_label聚合求全量 +// SingleOutputStreamOperator appNameStream2 = inputForSession.filter(new FilterFunction() { +// @Override +// public boolean filter(Entity value) throws Exception { +// return StringUtil.isNotEmpty(value.getCommon_app_label()); +// } +// }).assignTimestampsAndWatermarks(strategyForSession); +// +// +// +// appNameStream2.keyBy(new groupBySelector("common_app_label")) +// .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) +// .reduce(new metricsAggregationReduce(), new metricsCalculateForApp()).addSink(getKafkaSink("TRAFFIC-APP-STAT")).setParallelism(TASK_PARALLELISM); +// +// +// +// //url聚合session求top +// SingleOutputStreamOperator UrlStream2 = inputForUrl.filter(new FilterFunction() { +// @Override +// public boolean filter(UrlEntity value) throws Exception { +// return StringUtil.isNotEmpty(value.getHttp_url()); +// } +// }).assignTimestampsAndWatermarks(strategyForSecurity); +// +// +// SingleOutputStreamOperator windowedStreamForUrl2 = UrlStream2.keyBy(new twoKeySelector()) +// .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) +// .reduce(new UrlAggregationReduce(), new DatasketchUrlCalculate(URL_TOP_LIMIT)).setParallelism(TASK_PARALLELISM); +// DataStream WindoweddStreamForUrl2 = windowedStreamForUrl2.keyBy(new oneKeySelector()) +// .process(new TopNHotItemsForUrl(URL_TOP_LIMIT)).setParallelism(1); +// WindoweddStreamForUrl2.addSink(getKafkaSink("TOP-URLS")).setParallelism(3); +// + + + + + +// break; + } diff --git a/src/main/java/com/galaxy/tsg/function/ApplabelAggregate.java b/src/main/java/com/galaxy/tsg/function/ApplabelAggregate.java new file mode 100644 index 0000000..98c46eb --- /dev/null +++ b/src/main/java/com/galaxy/tsg/function/ApplabelAggregate.java @@ -0,0 +1,9 @@ +package com.galaxy.tsg.function; + +/** + * @author fy + * @version 1.0 + * @date 2023/3/14 10:19 + */ +public class ApplabelAggregate { +} diff --git a/src/main/java/com/galaxy/tsg/function/CollectionValue.java b/src/main/java/com/galaxy/tsg/function/CollectionValue.java index e8bd256..16c0936 100644 --- a/src/main/java/com/galaxy/tsg/function/CollectionValue.java +++ b/src/main/java/com/galaxy/tsg/function/CollectionValue.java @@ -24,6 +24,13 @@ public class CollectionValue { public String order_by; public String source; + private String destination ; + private String domain; + private String subscriber_id; + private String app_name; + + + public CollectionValue(long c2s_byte_num, long c2s_pkt_num, long s2c_byte_num, long s2c_pkt_num, long session_num, long stat_time, long vsys_id, String data_center, String device_group, String order_by, String source) { this.c2s_byte_num = c2s_byte_num; this.c2s_pkt_num = c2s_pkt_num; @@ -39,7 +46,7 @@ public class CollectionValue { } - public CollectionValue(Entity entity,String key ) { + public CollectionValue(Entity entity,String orderby,String key ) { @@ -52,8 +59,58 @@ public class CollectionValue { this.vsys_id = entity.getCommon_vsys_id(); this.data_center = entity.getCommon_data_center(); this.device_group = entity.getCommon_device_group(); - this.order_by = key; -// this.source = source;//分情况 + this.order_by = orderby; + + this.source = entity.getCommon_client_ip();//分情况???????????? + + + switch(key) { + case "common_client_ip": + this.source =entity.getCommon_client_ip(); + break; + case "common_server_ip": + this.destination =entity.getCommon_server_ip(); + break; + case "common_internal_ip": + this.source =entity.getCommon_internal_ip(); + break; + case "common_external_ip": + this.destination =entity.getCommon_external_ip(); + break; + case "common_subscriber_id": + this.subscriber_id =entity.getCommon_subscriber_id(); + + break; + + case "common_app_label": + this.app_name =entity.getCommon_app_label(); + break; + + default: + + + } + + + + + } + + + public CollectionValue(Entity entity,String orderby ) { + + + this.c2s_byte_num = entity.getCommon_c2s_byte_num(); + this.c2s_pkt_num = entity.getCommon_c2s_pkt_num(); + this.s2c_byte_num = entity.getCommon_s2c_byte_num(); + this.s2c_pkt_num = entity.getCommon_s2c_pkt_num(); + this.session_num = entity.getCommon_sessions(); + this.stat_time = System.currentTimeMillis() / 1000; + this.vsys_id = entity.getCommon_vsys_id(); + this.data_center = entity.getCommon_data_center(); + this.device_group = entity.getCommon_device_group(); + this.order_by = orderby; + } @@ -172,6 +229,38 @@ public class CollectionValue { } + public String getDestination() { + return destination; + } + + public void setDestination(String destination) { + this.destination = destination; + } + + public String getDomain() { + return domain; + } + + public void setDomain(String domain) { + this.domain = domain; + } + + public String getSubscriber_id() { + return subscriber_id; + } + + public void setSubscriber_id(String subscriber_id) { + this.subscriber_id = subscriber_id; + } + + public String getApp_name() { + return app_name; + } + + public void setApp_name(String app_name) { + this.app_name = app_name; + } + @Override public String toString() { return "CollectionValue{" + diff --git a/src/main/java/com/galaxy/tsg/function/DatasketchForMetricsAggregate2.java b/src/main/java/com/galaxy/tsg/function/DatasketchForMetricsAggregate2.java new file mode 100644 index 0000000..1af203a --- /dev/null +++ b/src/main/java/com/galaxy/tsg/function/DatasketchForMetricsAggregate2.java @@ -0,0 +1,147 @@ +package com.galaxy.tsg.function; + + +import com.galaxy.tsg.pojo.Entity; +import org.apache.datasketches.frequencies.ItemsSketch; +import org.apache.flink.api.common.functions.AggregateFunction; + +import java.util.HashMap; + +/** + * @author fy + * @version 1.0 + * @date 2023/2/14 17:29 + * + *Session_record top10000 21个窗口计算 + */ +public class DatasketchForMetricsAggregate2 implements AggregateFunction, HashMap> { + + private String key ; + + + public DatasketchForMetricsAggregate2(String key ){ + + this.key = key; + } + + + @Override + public HashMap createAccumulator() { + return new HashMap(32768); + } + + @Override + public HashMap add(Entity entity, HashMap stringItemsSketchHashMap) { + +// String dimension = cnRecordLog.getCommon_client_ip();//维度 +// System.out.println(dimension); + + + if(stringItemsSketchHashMap.isEmpty()) { + + ItemsSketch sessionItemsSketch = new ItemsSketch<>(32768);//新建 + ItemsSketch pktItemsSketch = new ItemsSketch<>(32768);//新建 + ItemsSketch byteItemsSketch = new ItemsSketch<>(32768);//新建 + + sessionItemsSketch.update(Dimension.setDimensionValue(entity,key),entity.getCommon_sessions()); + pktItemsSketch.update(Dimension.setDimensionValue(entity,key),entity.getCommon_c2s_pkt_num()+entity.getCommon_s2c_pkt_num()); + byteItemsSketch.update(Dimension.setDimensionValue(entity,key),entity.getCommon_c2s_byte_num()+entity.getCommon_s2c_byte_num()); + + + CollectionValue sessionsCollectionValue = new CollectionValue(entity,"sessions",key); +// sessionsCollectionValue.setSource(entity.getCommon_client_ip()); + HashMap sessionsStringCollectionValueHashMap = new HashMap<>(); + sessionsStringCollectionValueHashMap.put(Dimension.setDimensionValue(entity,key),sessionsCollectionValue); + + CollectionValue packetsCollectionValue = new CollectionValue(entity,"packets",key); +// packetsCollectionValue.setSource(entity.getCommon_client_ip()); + HashMap packetsStringCollectionValueHashMap = new HashMap<>(); + packetsStringCollectionValueHashMap.put(Dimension.setDimensionValue(entity,key),packetsCollectionValue); + + CollectionValue bytesCollectionValue = new CollectionValue(entity,"bytes",key); +// bytesCollectionValue.setSource(entity.getCommon_client_ip()); + HashMap bytesStringCollectionValueHashMap = new HashMap<>(); + bytesStringCollectionValueHashMap.put(Dimension.setDimensionValue(entity,key),bytesCollectionValue); + + + + DimensionItemsSketch sessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.setDimensionTag(key), sessionItemsSketch,sessionsStringCollectionValueHashMap); + DimensionItemsSketch packetsDimensionItemsSketch = new DimensionItemsSketch(Dimension.setDimensionTag(key), pktItemsSketch,packetsStringCollectionValueHashMap); + DimensionItemsSketch bytesDimensionItemsSketch = new DimensionItemsSketch(Dimension.setDimensionTag(key), byteItemsSketch,bytesStringCollectionValueHashMap); + + + stringItemsSketchHashMap.put(key+"Session", sessionDimensionItemsSketch); + stringItemsSketchHashMap.put(key+"Packets", packetsDimensionItemsSketch); + stringItemsSketchHashMap.put(key+"Bytes", bytesDimensionItemsSketch); + + + + + }else { + + stringItemsSketchHashMap.get(key+"Session").getItemsSketch().update(Dimension.setDimensionValue(entity,key),entity.getCommon_sessions()); + stringItemsSketchHashMap.get(key+"Packets").getItemsSketch().update(Dimension.setDimensionValue(entity,key),entity.getCommon_c2s_pkt_num()+entity.getCommon_s2c_pkt_num()); + stringItemsSketchHashMap.get(key+"Bytes").getItemsSketch().update(Dimension.setDimensionValue(entity,key),entity.getCommon_c2s_byte_num()+entity.getCommon_s2c_byte_num()); + + + + CollectionValue sessionsCollectionValue = stringItemsSketchHashMap.get(key+"Session").getStringCollectionValueHashMap().get(Dimension.setDimensionValue(entity,key));//从key获取集合 + CollectionValue packetsCollectionValue = stringItemsSketchHashMap.get(key+"Packets").getStringCollectionValueHashMap().get(Dimension.setDimensionValue(entity,key));//从key获取集合 + CollectionValue bytesCollectionValue = stringItemsSketchHashMap.get(key+"Bytes").getStringCollectionValueHashMap().get(Dimension.setDimensionValue(entity,key));//从key获取集合 + + + + if (sessionsCollectionValue==null){ + + sessionsCollectionValue = new CollectionValue(entity,"sessions",key); + sessionsCollectionValue.setSource(entity.getCommon_client_ip()); + + packetsCollectionValue = new CollectionValue(entity,"packets",key); + packetsCollectionValue.setSource(entity.getCommon_client_ip()); + + bytesCollectionValue = new CollectionValue(entity,"bytes",key); + bytesCollectionValue.setSource(entity.getCommon_client_ip()); + + + stringItemsSketchHashMap.get(key+"Session").getStringCollectionValueHashMap().put(Dimension.setDimensionValue(entity,key),sessionsCollectionValue); + stringItemsSketchHashMap.get(key+"Packets").getStringCollectionValueHashMap().put(Dimension.setDimensionValue(entity,key),packetsCollectionValue); + stringItemsSketchHashMap.get(key+"Bytes").getStringCollectionValueHashMap().put(Dimension.setDimensionValue(entity,key),bytesCollectionValue); + + + + }else {//做加和 + + sessionsCollectionValue.getCollectionValue(sessionsCollectionValue,entity); + packetsCollectionValue.getCollectionValue(packetsCollectionValue,entity); + bytesCollectionValue.getCollectionValue(bytesCollectionValue,entity); + + + + stringItemsSketchHashMap.get(key+"Session").getStringCollectionValueHashMap().put(Dimension.setDimensionValue(entity,key),sessionsCollectionValue); + stringItemsSketchHashMap.get(key+"Packets").getStringCollectionValueHashMap().put(Dimension.setDimensionValue(entity,key),packetsCollectionValue); + stringItemsSketchHashMap.get(key+"Bytes").getStringCollectionValueHashMap().put(Dimension.setDimensionValue(entity,key),bytesCollectionValue); + + } + + + } + + + + return stringItemsSketchHashMap; + } + + @Override + public HashMap getResult(HashMap stringItemsSketchHashMap) { + + + return stringItemsSketchHashMap; + } + + @Override + public HashMap merge(HashMap stringDimensionItemsSketchHashMap, HashMap acc1) { + + + return null; + } +} diff --git a/src/main/java/com/galaxy/tsg/function/DatasketchForUrlAggregate2.java b/src/main/java/com/galaxy/tsg/function/DatasketchForUrlAggregate2.java new file mode 100644 index 0000000..60ffbf7 --- /dev/null +++ b/src/main/java/com/galaxy/tsg/function/DatasketchForUrlAggregate2.java @@ -0,0 +1,107 @@ +package com.galaxy.tsg.function; + + +import com.galaxy.tsg.pojo.TopUrlEntity; +import com.galaxy.tsg.pojo.UrlEntity; +import org.apache.datasketches.frequencies.ItemsSketch; +import org.apache.flink.api.common.functions.AggregateFunction; + +import java.util.HashMap; + +/** + * @author fy + * @version 1.0 + * @date 2023/2/14 17:29 + * + *Session_record top10000 21个窗口计算 + */ +public class DatasketchForUrlAggregate2 implements AggregateFunction, HashMap> { + + + @Override + public HashMap createAccumulator() { + return new HashMap(1048576); + } + + @Override + public HashMap add(UrlEntity urlEntity, HashMap stringItemsSketchHashMap) { + +// String dimension = cnRecordLog.getCommon_client_ip();//维度 +// System.out.println(dimension); + + if(stringItemsSketchHashMap.isEmpty()) { + + ItemsSketch urlSessionItemsSketch = new ItemsSketch<>(1048576);//新建 + + urlSessionItemsSketch.update(Dimension.setUrlDimension(urlEntity),urlEntity.getCommon_sessions()); + + + TopUrlEntity topUrlEntity = new TopUrlEntity(); + topUrlEntity.setSession_num(urlEntity.getCommon_sessions()); + topUrlEntity.setStat_time(System.currentTimeMillis() / 1000); + topUrlEntity.setUrl(urlEntity.getHttp_url()); + topUrlEntity.setVsys_id(urlEntity.getCommon_vsys_id()); + HashMap stringTopUrlEntityHashMap = new HashMap<>(); + stringTopUrlEntityHashMap.put(Dimension.setUrlDimension(urlEntity),topUrlEntity); + + + DimensionItemsSketch urlSessionDimensionItemsSketch = new DimensionItemsSketch(stringTopUrlEntityHashMap,Dimension.URL,urlSessionItemsSketch); + + stringItemsSketchHashMap.put("urlSession", urlSessionDimensionItemsSketch); + + + }else { + + + stringItemsSketchHashMap.get("urlSession").getItemsSketch().update(Dimension.setUrlDimension(urlEntity),urlEntity.getCommon_sessions()); + + TopUrlEntity urlSession = stringItemsSketchHashMap.get("urlSession").getStringTopUrlEntityHashMap().get(Dimension.setUrlDimension(urlEntity));//从key获取集合 + + if (urlSession==null){ + + TopUrlEntity topUrlEntity = new TopUrlEntity(); + topUrlEntity.setSession_num(urlEntity.getCommon_sessions()); + topUrlEntity.setStat_time(System.currentTimeMillis() / 1000); + topUrlEntity.setUrl(urlEntity.getHttp_url()); + topUrlEntity.setVsys_id(urlEntity.getCommon_vsys_id()); + + stringItemsSketchHashMap.get("urlSession").getStringTopUrlEntityHashMap().put(Dimension.setUrlDimension(urlEntity),urlSession); + + }else {//做加和 + + + urlSession.setSession_num(urlSession.getSession_num()+urlEntity.getCommon_sessions()); + + + stringItemsSketchHashMap.get("urlSession").getStringTopUrlEntityHashMap().put(Dimension.setUrlDimension(urlEntity),urlSession); + + + } + + + + + + + } + + + + return stringItemsSketchHashMap; + } + + @Override + public HashMap getResult(HashMap stringItemsSketchHashMap) { + + + return stringItemsSketchHashMap; + } + + @Override + public HashMap merge(HashMap stringItemsSketchHashMap, HashMap acc1) { + + + + return null; + } +} diff --git a/src/main/java/com/galaxy/tsg/function/DatasketchMetricsCalculate.java b/src/main/java/com/galaxy/tsg/function/DatasketchMetricsCalculate.java index 47427fb..aabe4c2 100644 --- a/src/main/java/com/galaxy/tsg/function/DatasketchMetricsCalculate.java +++ b/src/main/java/com/galaxy/tsg/function/DatasketchMetricsCalculate.java @@ -1,6 +1,5 @@ package com.galaxy.tsg.function; -import com.alibaba.fastjson.JSONObject; import com.galaxy.tsg.pojo.*; import org.apache.datasketches.frequencies.ErrorType; import org.apache.datasketches.frequencies.ItemsSketch; @@ -53,16 +52,16 @@ public class DatasketchMetricsCalculate extends ProcessWindowFunction< for (Entity entity:iterable){ //处理session - sessionItemsSketch.update(Dimension.setDimension(entity,key),entity.getCommon_sessions()); - sessionResultEntityHashMap.put(Dimension.setDimension(entity,key),enrichessionResult(context.window().getEnd() / 1000, entity)); + sessionItemsSketch.update(Dimension.setDimensionValue(entity,key),entity.getCommon_sessions()); + sessionResultEntityHashMap.put(Dimension.setDimensionValue(entity,key),enrichessionResult(context.window().getEnd() / 1000, entity)); //处理pkt - pktItemsSketch.update(Dimension.setDimension(entity,key),entity.getCommon_c2s_pkt_num()+entity.getCommon_s2c_pkt_num()); - packetResultEntityHashMap.put(Dimension.setDimension(entity,key),enrichPacketResult(context.window().getEnd() / 1000, entity)); + pktItemsSketch.update(Dimension.setDimensionValue(entity,key),entity.getCommon_c2s_pkt_num()+entity.getCommon_s2c_pkt_num()); + packetResultEntityHashMap.put(Dimension.setDimensionValue(entity,key),enrichPacketResult(context.window().getEnd() / 1000, entity)); //处理byte - byteItemsSketch.update(Dimension.setDimension(entity,key),entity.getCommon_c2s_byte_num()+entity.getCommon_s2c_byte_num()); - byteResultEntityHashMap.put(Dimension.setDimension(entity,key),enrichByteResult(context.window().getEnd() / 1000, entity)); + byteItemsSketch.update(Dimension.setDimensionValue(entity,key),entity.getCommon_c2s_byte_num()+entity.getCommon_s2c_byte_num()); + byteResultEntityHashMap.put(Dimension.setDimensionValue(entity,key),enrichByteResult(context.window().getEnd() / 1000, entity)); } diff --git a/src/main/java/com/galaxy/tsg/function/Dimension.java b/src/main/java/com/galaxy/tsg/function/Dimension.java index 17e6476..16c332e 100644 --- a/src/main/java/com/galaxy/tsg/function/Dimension.java +++ b/src/main/java/com/galaxy/tsg/function/Dimension.java @@ -79,7 +79,7 @@ public class Dimension { - public static String setDimension(Entity entity,String key){ + public static String setDimensionValue(Entity entity, String key){ String dimension = ""; @@ -120,6 +120,44 @@ public class Dimension { + public static String setDimensionTag(String key){ + + String dimension = ""; + + switch (key) { + case "common_client_ip": + dimension = CLIENTIP; + break; + case "common_server_ip": + dimension = SERVERIP; + break; + case "common_internal_ip": + dimension = INTERNALIP; + break; + case "common_external_ip": + dimension = EXTERNALIP; + break; + case "http_domain": + dimension = DOMAIN; + break; + + case "common_subscriber_id": + dimension = SUBSCRIBERID; + break; + + case "common_app_label": + dimension = APPLABEL; + break; + + default: + + } + + + + + return dimension; + } diff --git a/src/main/java/com/galaxy/tsg/function/UserCountWindowResult6.java b/src/main/java/com/galaxy/tsg/function/UserCountWindowResult6.java new file mode 100644 index 0000000..b7b7d36 --- /dev/null +++ b/src/main/java/com/galaxy/tsg/function/UserCountWindowResult6.java @@ -0,0 +1,89 @@ +package com.galaxy.tsg.function; + +import com.alibaba.fastjson.JSONObject; +import com.galaxy.tsg.pojo.TopUrlEntity; +import org.apache.datasketches.frequencies.ErrorType; +import org.apache.datasketches.frequencies.ItemsSketch; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.Collector; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import static com.galaxy.tsg.config.commonConfig.TOP_LIMIT; + +/** + * @author fy + * @version 1.0 + * @date 2023/3/7 15:50 + */ +public class UserCountWindowResult6 extends ProcessWindowFunction, String, Tuple4,TimeWindow> { + + + @Override + public void process(Tuple4 stringLongStringStringTuple4, Context context, Iterable> iterable, Collector collector) throws Exception { + + + HashMap dataHashMap = iterable.iterator().next(); + + +// System.out.println(dataHashMap.toString()); + + Set> entries = dataHashMap.entrySet(); + + + for (Map.Entry entry : entries) { + +// System.out.println(entry.getKey()+""); +// stringBuilder.append(entry.getKey()+"\n"); + + ItemsSketch.Row[] items = entry.getValue().getItemsSketch().getFrequentItems(ErrorType.NO_FALSE_POSITIVES); + + for (int i = 0; i < items.length; i++) { + + +// String resultStr = "No." + (i + 1) + " " +// + "ip:" + items[i].getItem() + " " +// + " Est:" + items[i].getEstimate() +// + " UB:" + items[i].getUpperBound() +// + " LB:" + items[i].getLowerBound(); + + String jsonStr = ""; + if (!entry.getKey().equals("urlSession")) { + CollectionValue collectionValue = entry.getValue().getStringCollectionValueHashMap().get(items[i].getItem()); + collectionValue.setStat_time(context.window().getEnd() / 1000); + jsonStr = JSONObject.toJSONString(collectionValue); + } else { + TopUrlEntity topUrlEntity = entry.getValue().getStringTopUrlEntityHashMap().get(items[i].getItem()); + topUrlEntity.setStat_time(context.window().getEnd() / 1000); + jsonStr = JSONObject.toJSONString(topUrlEntity); + } + + + collector.collect(jsonStr); + +// String item = items[i].toString(); +// stringBuilder.append(resultStr); + +// stringBuilder.append("\n"); + + + if (i == TOP_LIMIT)//够条数就结束 + break; + + } + + + } + + + } + + +} diff --git a/src/main/java/com/galaxy/tsg/function/UserCountWindowResult7.java b/src/main/java/com/galaxy/tsg/function/UserCountWindowResult7.java new file mode 100644 index 0000000..f93bac3 --- /dev/null +++ b/src/main/java/com/galaxy/tsg/function/UserCountWindowResult7.java @@ -0,0 +1,87 @@ +package com.galaxy.tsg.function; + +import com.alibaba.fastjson.JSONObject; +import com.galaxy.tsg.pojo.TopUrlEntity; +import org.apache.datasketches.frequencies.ErrorType; +import org.apache.datasketches.frequencies.ItemsSketch; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.Collector; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import static com.galaxy.tsg.config.commonConfig.TOP_LIMIT; + +/** + * @author fy + * @version 1.0 + * @date 2023/3/7 15:50 + */ +public class UserCountWindowResult7 extends ProcessWindowFunction, String, Tuple2,TimeWindow> { + + + @Override + public void process(Tuple2 stringLongTuple2, Context context, Iterable> iterable, Collector collector) throws Exception { + + + HashMap dataHashMap = iterable.iterator().next(); + + +// System.out.println(dataHashMap.toString()); + + Set> entries = dataHashMap.entrySet(); + + + for (Map.Entry entry : entries) { + +// System.out.println(entry.getKey()+""); +// stringBuilder.append(entry.getKey()+"\n"); + + ItemsSketch.Row[] items = entry.getValue().getItemsSketch().getFrequentItems(ErrorType.NO_FALSE_POSITIVES); + + for (int i = 0; i < items.length; i++) { + + +// String resultStr = "No." + (i + 1) + " " +// + "ip:" + items[i].getItem() + " " +// + " Est:" + items[i].getEstimate() +// + " UB:" + items[i].getUpperBound() +// + " LB:" + items[i].getLowerBound(); + + String jsonStr = ""; + if (!entry.getKey().equals("urlSession")) { + CollectionValue collectionValue = entry.getValue().getStringCollectionValueHashMap().get(items[i].getItem()); + collectionValue.setStat_time(context.window().getEnd() / 1000); + jsonStr = JSONObject.toJSONString(collectionValue); + } else { + TopUrlEntity topUrlEntity = entry.getValue().getStringTopUrlEntityHashMap().get(items[i].getItem()); + topUrlEntity.setStat_time(context.window().getEnd() / 1000); + jsonStr = JSONObject.toJSONString(topUrlEntity); + } + + + collector.collect(jsonStr); + +// String item = items[i].toString(); +// stringBuilder.append(resultStr); + +// stringBuilder.append("\n"); + + + if (i == TOP_LIMIT)//够条数就结束 + break; + + } + + + } + + + } + + +}