datasketch方法处理top n,改回aggregate和window

This commit is contained in:
fengyi
2023-03-14 14:33:51 +08:00
parent f9c33dd93c
commit a27e41cfca
9 changed files with 799 additions and 51 deletions

View File

@@ -228,8 +228,8 @@ public class Toptask {
case 2:
//datasketch
//clientip聚合TOP
//clientip聚合TOP
SingleOutputStreamOperator<Entity> clientipdStream2 = inputForSession.filter(new FilterFunction<Entity>() {
@Override
public boolean filter(Entity value) throws Exception {
@@ -237,12 +237,16 @@ public class Toptask {
}
}).assignTimestampsAndWatermarks(strategyForSession);
SingleOutputStreamOperator<ResultEntity> windowedStream2 = clientipdStream2.keyBy(new groupBySelector("common_client_ip"))
clientipdStream2.keyBy(new groupBySelector("common_client_ip"))
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.reduce(new metricsAggregationReduce(), new DatasketchMetricsCalculate(TOP_LIMIT, "common_client_ip")).setParallelism(TASK_PARALLELISM);
DataStream<String> windoweddStream2 = windowedStream2.keyBy(new oneKeySelector())
.process(new TopNHotItems(TOP_LIMIT)).setParallelism(3);
windoweddStream2.addSink(getKafkaSink("TOP-CLIENT-IP")).setParallelism(3);
.aggregate(new DatasketchForMetricsAggregate2("common_client_ip"), new UserCountWindowResult6())
// .setParallelism(TASK_PARALLELISM)
// .print();
.addSink(getKafkaSink("TOP-CLIENT-IP")).setParallelism(3);
//serverip聚合TOP
@@ -253,12 +257,14 @@ public class Toptask {
}
}).assignTimestampsAndWatermarks(strategyForSession);
SingleOutputStreamOperator<ResultEntity> windowedStreamForServerIp2 = serveripdStream2.keyBy(new groupBySelector("common_server_ip"))
serveripdStream2.keyBy(new groupBySelector("common_server_ip"))
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.reduce(new metricsAggregationReduce(), new DatasketchMetricsCalculate(TOP_LIMIT, "common_server_ip")).setParallelism(TASK_PARALLELISM);
DataStream<String> windoweddStreamForServerIp2 = windowedStreamForServerIp2.keyBy(new oneKeySelector())
.process(new TopNHotItems(TOP_LIMIT)).setParallelism(3);
windoweddStreamForServerIp2.addSink(getKafkaSink("TOP-SERVER-IP")).setParallelism(3);
.aggregate(new DatasketchForMetricsAggregate2("common_server_ip"), new UserCountWindowResult6())
// .print();
.addSink(getKafkaSink("TOP-SERVER-IP")).setParallelism(3);
//common_internal_ip聚合TOP
@@ -269,13 +275,16 @@ public class Toptask {
}
}).assignTimestampsAndWatermarks(strategyForSession);
SingleOutputStreamOperator<ResultEntity> windowedStreamForInternal2 = internalStream2.keyBy(new groupBySelector("common_internal_ip"))
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.reduce(new metricsAggregationReduce(), new DatasketchMetricsCalculate(TOP_LIMIT, "common_internal_ip")).setParallelism(TASK_PARALLELISM);
DataStream<String> WindoweddStreamForInternal2 = windowedStreamForInternal2.keyBy(new oneKeySelector())
.process(new TopNHotItems(TOP_LIMIT)).setParallelism(3);
WindoweddStreamForInternal2.addSink(getKafkaSink("TOP-INTERNAL-HOST")).setParallelism(3);
internalStream2.keyBy(new groupBySelector("common_internal_ip"))
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.aggregate(new DatasketchForMetricsAggregate2("common_internal_ip"), new UserCountWindowResult6())
// .print();
.addSink(getKafkaSink("TOP-INTERNAL-HOST")).setParallelism(3);
//
//
//
//common_external_ip聚合TOP
SingleOutputStreamOperator<Entity> externalStream2 = inputForSession.filter(new FilterFunction<Entity>() {
@@ -285,12 +294,14 @@ public class Toptask {
}
}).assignTimestampsAndWatermarks(strategyForSession);
SingleOutputStreamOperator<ResultEntity> windowedStreamForExternal2 = externalStream2.keyBy(new groupBySelector("common_external_ip"))
externalStream2.keyBy(new groupBySelector("common_external_ip"))
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.reduce(new metricsAggregationReduce(), new DatasketchMetricsCalculate(TOP_LIMIT, "common_external_ip")).setParallelism(TASK_PARALLELISM);
DataStream<String> WindoweddStreamForExternal2 = windowedStreamForExternal2.keyBy(new oneKeySelector())
.process(new TopNHotItems(TOP_LIMIT)).setParallelism(3);
WindoweddStreamForExternal2.addSink(getKafkaSink("TOP-EXTERNAL-HOST")).setParallelism(3);
.aggregate(new DatasketchForMetricsAggregate2("common_external_ip"), new UserCountWindowResult6())
// .print();
.addSink(getKafkaSink("TOP-EXTERNAL-HOST")).setParallelism(3);
//http_domain聚合TOP
@@ -301,13 +312,17 @@ public class Toptask {
}
}).assignTimestampsAndWatermarks(strategyForSession);
SingleOutputStreamOperator<ResultEntity> windowedStreamForDomain2 = domainStream2.keyBy(new groupBySelector("http_domain"))
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.reduce(new metricsAggregationReduce(), new DatasketchMetricsCalculate(TOP_LIMIT, "http_domain")).setParallelism(TASK_PARALLELISM);
DataStream<String> WindoweddStreamForDomain2 = windowedStreamForDomain2.keyBy(new oneKeySelector())
.process(new TopNHotItems(TOP_LIMIT)).setParallelism(3);
WindoweddStreamForDomain2.addSink(getKafkaSink("TOP-WEBSITE-DOMAIN")).setParallelism(3);
domainStream2.keyBy(new groupBySelector("http_domain"))
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.aggregate(new DatasketchForMetricsAggregate2("http_domain"), new UserCountWindowResult6())
// .print();
.addSink(getKafkaSink("TOP-WEBSITE-DOMAIN")).setParallelism(3);
//
//
//
//common_subscriber_id聚合TOP
SingleOutputStreamOperator<Entity> userStream2 = inputForSession.filter(new FilterFunction<Entity>() {
@Override
public boolean filter(Entity value) throws Exception {
@@ -315,13 +330,15 @@ public class Toptask {
}
}).assignTimestampsAndWatermarks(strategyForSession);
//common_subscriber_id聚合TOP
SingleOutputStreamOperator<ResultEntity> windowedStreamForUser2 = userStream2.keyBy(new groupBySelector("common_subscriber_id"))
userStream2.keyBy(new groupBySelector("common_subscriber_id"))
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.reduce(new metricsAggregationReduce(), new DatasketchMetricsCalculate(TOP_LIMIT, "common_subscriber_id")).setParallelism(TASK_PARALLELISM);
DataStream<String> WindoweddStreamForUser2 = windowedStreamForUser2.keyBy(new oneKeySelector())
.process(new TopNHotItems(TOP_LIMIT)).setParallelism(3);
WindoweddStreamForUser2.addSink(getKafkaSink("TOP-USER")).setParallelism(3);
.aggregate(new DatasketchForMetricsAggregate2("common_subscriber_id"), new UserCountWindowResult6())
// .print();
.addSink(getKafkaSink("TOP-USER")).setParallelism(3);
//
//common_app_label聚合求全量
@@ -336,7 +353,9 @@ public class Toptask {
appNameStream2.keyBy(new groupBySelector("common_app_label"))
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.reduce(new metricsAggregationReduce(), new metricsCalculateForApp()).addSink(getKafkaSink("TRAFFIC-APP-STAT")).setParallelism(TASK_PARALLELISM);
.reduce(new metricsAggregationReduce(), new metricsCalculateForApp())
.addSink(getKafkaSink("TRAFFIC-APP-STAT"))
.setParallelism(TASK_PARALLELISM);
@@ -349,12 +368,14 @@ public class Toptask {
}).assignTimestampsAndWatermarks(strategyForSecurity);
SingleOutputStreamOperator<ResultEntity> windowedStreamForUrl2 = UrlStream2.keyBy(new twoKeySelector())
UrlStream2.keyBy(new twoKeySelector())
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.reduce(new UrlAggregationReduce(), new DatasketchUrlCalculate(URL_TOP_LIMIT)).setParallelism(TASK_PARALLELISM);
DataStream<String> WindoweddStreamForUrl2 = windowedStreamForUrl2.keyBy(new oneKeySelector())
.process(new TopNHotItemsForUrl(URL_TOP_LIMIT)).setParallelism(1);
WindoweddStreamForUrl2.addSink(getKafkaSink("TOP-URLS")).setParallelism(3);
.aggregate(new DatasketchForUrlAggregate2(), new UserCountWindowResult7())
// .print();
.addSink(getKafkaSink("TOP-URLS")).setParallelism(3);
@@ -363,6 +384,168 @@ public class Toptask {
break;
// clientipdStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
// .aggregate(new DatasketchForMetricsAggregate("clientIpSession"), new UserCountWindowResult5())
//// .print();
// .addSink(getKafkaSink("TOP-CLIENT-IP")).setParallelism(3);
// clientipdStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
// .aggregate(new DatasketchForMetricsAggregate("clientIpPkt"), new UserCountWindowResult5())
//// .print();
// .addSink(getKafkaSink("TOP-CLIENT-IP")).setParallelism(3);
//
//
// clientipdStream2.windowAll(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
// .aggregate(new DatasketchForMetricsAggregate("clientIpByte"), new UserCountWindowResult5())
//// .print();
// .addSink(getKafkaSink("TOP-CLIENT-IP")).setParallelism(3);
//
// //clientip聚合TOP
//
// SingleOutputStreamOperator<Entity> clientipdStream2 = inputForSession.filter(new FilterFunction<Entity>() {
// @Override
// public boolean filter(Entity value) throws Exception {
// return "IPv6_TCP".equals(value.getCommon_l4_protocol()) || "IPv4_TCP".equals(value.getCommon_l4_protocol());
// }
// }).assignTimestampsAndWatermarks(strategyForSession);
//
// SingleOutputStreamOperator<ResultEntity> windowedStream2 = clientipdStream2.keyBy(new groupBySelector("common_client_ip"))
// .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
// .reduce(new metricsAggregationReduce(), new DatasketchMetricsCalculate(TOP_LIMIT, "common_client_ip")).setParallelism(TASK_PARALLELISM);
// DataStream<String> windoweddStream2 = windowedStream2.keyBy(new oneKeySelector())
// .process(new TopNHotItems(TOP_LIMIT)).setParallelism(3);
// windoweddStream2.addSink(getKafkaSink("TOP-CLIENT-IP")).setParallelism(3);
//
// //serverip聚合TOP
//
// SingleOutputStreamOperator<Entity> serveripdStream2 = inputForSession.filter(new FilterFunction<Entity>() {
// @Override
// public boolean filter(Entity value) throws Exception {
// return "IPv6_TCP".equals(value.getCommon_l4_protocol()) || "IPv4_TCP".equals(value.getCommon_l4_protocol());
// }
// }).assignTimestampsAndWatermarks(strategyForSession);
//
// SingleOutputStreamOperator<ResultEntity> windowedStreamForServerIp2 = serveripdStream2.keyBy(new groupBySelector("common_server_ip"))
// .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
// .reduce(new metricsAggregationReduce(), new DatasketchMetricsCalculate(TOP_LIMIT, "common_server_ip")).setParallelism(TASK_PARALLELISM);
// DataStream<String> windoweddStreamForServerIp2 = windowedStreamForServerIp2.keyBy(new oneKeySelector())
// .process(new TopNHotItems(TOP_LIMIT)).setParallelism(3);
// windoweddStreamForServerIp2.addSink(getKafkaSink("TOP-SERVER-IP")).setParallelism(3);
//
//
// //common_internal_ip聚合TOP
// SingleOutputStreamOperator<Entity> internalStream2 = inputForSession.filter(new FilterFunction<Entity>() {
// @Override
// public boolean filter(Entity value) throws Exception {
// return StringUtil.isNotEmpty(value.getCommon_internal_ip());
// }
// }).assignTimestampsAndWatermarks(strategyForSession);
//
// SingleOutputStreamOperator<ResultEntity> windowedStreamForInternal2 = internalStream2.keyBy(new groupBySelector("common_internal_ip"))
// .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
// .reduce(new metricsAggregationReduce(), new DatasketchMetricsCalculate(TOP_LIMIT, "common_internal_ip")).setParallelism(TASK_PARALLELISM);
// DataStream<String> WindoweddStreamForInternal2 = windowedStreamForInternal2.keyBy(new oneKeySelector())
// .process(new TopNHotItems(TOP_LIMIT)).setParallelism(3);
// WindoweddStreamForInternal2.addSink(getKafkaSink("TOP-INTERNAL-HOST")).setParallelism(3);
//
// //common_external_ip聚合TOP
//
// SingleOutputStreamOperator<Entity> externalStream2 = inputForSession.filter(new FilterFunction<Entity>() {
// @Override
// public boolean filter(Entity value) throws Exception {
// return StringUtil.isNotEmpty(value.getCommon_external_ip());
// }
// }).assignTimestampsAndWatermarks(strategyForSession);
//
// SingleOutputStreamOperator<ResultEntity> windowedStreamForExternal2 = externalStream2.keyBy(new groupBySelector("common_external_ip"))
// .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
// .reduce(new metricsAggregationReduce(), new DatasketchMetricsCalculate(TOP_LIMIT, "common_external_ip")).setParallelism(TASK_PARALLELISM);
// DataStream<String> WindoweddStreamForExternal2 = windowedStreamForExternal2.keyBy(new oneKeySelector())
// .process(new TopNHotItems(TOP_LIMIT)).setParallelism(3);
// WindoweddStreamForExternal2.addSink(getKafkaSink("TOP-EXTERNAL-HOST")).setParallelism(3);
//
// //http_domain聚合TOP
//
// SingleOutputStreamOperator<Entity> domainStream2 = inputForSession.filter(new FilterFunction<Entity>() {
// @Override
// public boolean filter(Entity value) throws Exception {
// return StringUtil.isNotEmpty(value.getHttp_domain());
// }
// }).assignTimestampsAndWatermarks(strategyForSession);
//
// SingleOutputStreamOperator<ResultEntity> windowedStreamForDomain2 = domainStream2.keyBy(new groupBySelector("http_domain"))
// .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
// .reduce(new metricsAggregationReduce(), new DatasketchMetricsCalculate(TOP_LIMIT, "http_domain")).setParallelism(TASK_PARALLELISM);
// DataStream<String> WindoweddStreamForDomain2 = windowedStreamForDomain2.keyBy(new oneKeySelector())
// .process(new TopNHotItems(TOP_LIMIT)).setParallelism(3);
// WindoweddStreamForDomain2.addSink(getKafkaSink("TOP-WEBSITE-DOMAIN")).setParallelism(3);
//
// SingleOutputStreamOperator<Entity> userStream2 = inputForSession.filter(new FilterFunction<Entity>() {
// @Override
// public boolean filter(Entity value) throws Exception {
// return StringUtil.isNotEmpty(value.getCommon_subscriber_id());
// }
// }).assignTimestampsAndWatermarks(strategyForSession);
//
// //common_subscriber_id聚合TOP
// SingleOutputStreamOperator<ResultEntity> windowedStreamForUser2 = userStream2.keyBy(new groupBySelector("common_subscriber_id"))
// .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
// .reduce(new metricsAggregationReduce(), new DatasketchMetricsCalculate(TOP_LIMIT, "common_subscriber_id")).setParallelism(TASK_PARALLELISM);
// DataStream<String> WindoweddStreamForUser2 = windowedStreamForUser2.keyBy(new oneKeySelector())
// .process(new TopNHotItems(TOP_LIMIT)).setParallelism(3);
// WindoweddStreamForUser2.addSink(getKafkaSink("TOP-USER")).setParallelism(3);
//
//
// //common_app_label聚合求全量
// SingleOutputStreamOperator<Entity> appNameStream2 = inputForSession.filter(new FilterFunction<Entity>() {
// @Override
// public boolean filter(Entity value) throws Exception {
// return StringUtil.isNotEmpty(value.getCommon_app_label());
// }
// }).assignTimestampsAndWatermarks(strategyForSession);
//
//
//
// appNameStream2.keyBy(new groupBySelector("common_app_label"))
// .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
// .reduce(new metricsAggregationReduce(), new metricsCalculateForApp()).addSink(getKafkaSink("TRAFFIC-APP-STAT")).setParallelism(TASK_PARALLELISM);
//
//
//
// //url聚合session求top
// SingleOutputStreamOperator<UrlEntity> UrlStream2 = inputForUrl.filter(new FilterFunction<UrlEntity>() {
// @Override
// public boolean filter(UrlEntity value) throws Exception {
// return StringUtil.isNotEmpty(value.getHttp_url());
// }
// }).assignTimestampsAndWatermarks(strategyForSecurity);
//
//
// SingleOutputStreamOperator<ResultEntity> windowedStreamForUrl2 = UrlStream2.keyBy(new twoKeySelector())
// .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
// .reduce(new UrlAggregationReduce(), new DatasketchUrlCalculate(URL_TOP_LIMIT)).setParallelism(TASK_PARALLELISM);
// DataStream<String> WindoweddStreamForUrl2 = windowedStreamForUrl2.keyBy(new oneKeySelector())
// .process(new TopNHotItemsForUrl(URL_TOP_LIMIT)).setParallelism(1);
// WindoweddStreamForUrl2.addSink(getKafkaSink("TOP-URLS")).setParallelism(3);
//
// break;
}

View File

@@ -0,0 +1,9 @@
package com.galaxy.tsg.function;
/**
* @author fy
* @version 1.0
* @date 2023/3/14 10:19
*/
public class ApplabelAggregate {
}

View File

@@ -24,6 +24,13 @@ public class CollectionValue {
public String order_by;
public String source;
private String destination ;
private String domain;
private String subscriber_id;
private String app_name;
public CollectionValue(long c2s_byte_num, long c2s_pkt_num, long s2c_byte_num, long s2c_pkt_num, long session_num, long stat_time, long vsys_id, String data_center, String device_group, String order_by, String source) {
this.c2s_byte_num = c2s_byte_num;
this.c2s_pkt_num = c2s_pkt_num;
@@ -39,7 +46,7 @@ public class CollectionValue {
}
public CollectionValue(Entity entity,String key ) {
public CollectionValue(Entity entity,String orderby,String key ) {
@@ -52,8 +59,58 @@ public class CollectionValue {
this.vsys_id = entity.getCommon_vsys_id();
this.data_center = entity.getCommon_data_center();
this.device_group = entity.getCommon_device_group();
this.order_by = key;
// this.source = source;//分情况
this.order_by = orderby;
this.source = entity.getCommon_client_ip();//分情况????????????
switch(key) {
case "common_client_ip":
this.source =entity.getCommon_client_ip();
break;
case "common_server_ip":
this.destination =entity.getCommon_server_ip();
break;
case "common_internal_ip":
this.source =entity.getCommon_internal_ip();
break;
case "common_external_ip":
this.destination =entity.getCommon_external_ip();
break;
case "common_subscriber_id":
this.subscriber_id =entity.getCommon_subscriber_id();
break;
case "common_app_label":
this.app_name =entity.getCommon_app_label();
break;
default:
}
}
public CollectionValue(Entity entity,String orderby ) {
this.c2s_byte_num = entity.getCommon_c2s_byte_num();
this.c2s_pkt_num = entity.getCommon_c2s_pkt_num();
this.s2c_byte_num = entity.getCommon_s2c_byte_num();
this.s2c_pkt_num = entity.getCommon_s2c_pkt_num();
this.session_num = entity.getCommon_sessions();
this.stat_time = System.currentTimeMillis() / 1000;
this.vsys_id = entity.getCommon_vsys_id();
this.data_center = entity.getCommon_data_center();
this.device_group = entity.getCommon_device_group();
this.order_by = orderby;
}
@@ -172,6 +229,38 @@ public class CollectionValue {
}
public String getDestination() {
return destination;
}
public void setDestination(String destination) {
this.destination = destination;
}
public String getDomain() {
return domain;
}
public void setDomain(String domain) {
this.domain = domain;
}
public String getSubscriber_id() {
return subscriber_id;
}
public void setSubscriber_id(String subscriber_id) {
this.subscriber_id = subscriber_id;
}
public String getApp_name() {
return app_name;
}
public void setApp_name(String app_name) {
this.app_name = app_name;
}
@Override
public String toString() {
return "CollectionValue{" +

View File

@@ -0,0 +1,147 @@
package com.galaxy.tsg.function;
import com.galaxy.tsg.pojo.Entity;
import org.apache.datasketches.frequencies.ItemsSketch;
import org.apache.flink.api.common.functions.AggregateFunction;
import java.util.HashMap;
/**
* @author fy
* @version 1.0
* @date 2023/2/14 17:29
*
*Session_record top10000 21个窗口计算
*/
public class DatasketchForMetricsAggregate2 implements AggregateFunction<Entity, HashMap<String,DimensionItemsSketch>, HashMap<String,DimensionItemsSketch>> {
private String key ;
public DatasketchForMetricsAggregate2(String key ){
this.key = key;
}
@Override
public HashMap<String, DimensionItemsSketch> createAccumulator() {
return new HashMap<String, DimensionItemsSketch>(32768);
}
@Override
public HashMap<String, DimensionItemsSketch> add(Entity entity, HashMap<String, DimensionItemsSketch> stringItemsSketchHashMap) {
// String dimension = cnRecordLog.getCommon_client_ip();//维度
// System.out.println(dimension);
if(stringItemsSketchHashMap.isEmpty()) {
ItemsSketch<String> sessionItemsSketch = new ItemsSketch<>(32768);//新建
ItemsSketch<String> pktItemsSketch = new ItemsSketch<>(32768);//新建
ItemsSketch<String> byteItemsSketch = new ItemsSketch<>(32768);//新建
sessionItemsSketch.update(Dimension.setDimensionValue(entity,key),entity.getCommon_sessions());
pktItemsSketch.update(Dimension.setDimensionValue(entity,key),entity.getCommon_c2s_pkt_num()+entity.getCommon_s2c_pkt_num());
byteItemsSketch.update(Dimension.setDimensionValue(entity,key),entity.getCommon_c2s_byte_num()+entity.getCommon_s2c_byte_num());
CollectionValue sessionsCollectionValue = new CollectionValue(entity,"sessions",key);
// sessionsCollectionValue.setSource(entity.getCommon_client_ip());
HashMap<String, CollectionValue> sessionsStringCollectionValueHashMap = new HashMap<>();
sessionsStringCollectionValueHashMap.put(Dimension.setDimensionValue(entity,key),sessionsCollectionValue);
CollectionValue packetsCollectionValue = new CollectionValue(entity,"packets",key);
// packetsCollectionValue.setSource(entity.getCommon_client_ip());
HashMap<String, CollectionValue> packetsStringCollectionValueHashMap = new HashMap<>();
packetsStringCollectionValueHashMap.put(Dimension.setDimensionValue(entity,key),packetsCollectionValue);
CollectionValue bytesCollectionValue = new CollectionValue(entity,"bytes",key);
// bytesCollectionValue.setSource(entity.getCommon_client_ip());
HashMap<String, CollectionValue> bytesStringCollectionValueHashMap = new HashMap<>();
bytesStringCollectionValueHashMap.put(Dimension.setDimensionValue(entity,key),bytesCollectionValue);
DimensionItemsSketch sessionDimensionItemsSketch = new DimensionItemsSketch(Dimension.setDimensionTag(key), sessionItemsSketch,sessionsStringCollectionValueHashMap);
DimensionItemsSketch packetsDimensionItemsSketch = new DimensionItemsSketch(Dimension.setDimensionTag(key), pktItemsSketch,packetsStringCollectionValueHashMap);
DimensionItemsSketch bytesDimensionItemsSketch = new DimensionItemsSketch(Dimension.setDimensionTag(key), byteItemsSketch,bytesStringCollectionValueHashMap);
stringItemsSketchHashMap.put(key+"Session", sessionDimensionItemsSketch);
stringItemsSketchHashMap.put(key+"Packets", packetsDimensionItemsSketch);
stringItemsSketchHashMap.put(key+"Bytes", bytesDimensionItemsSketch);
}else {
stringItemsSketchHashMap.get(key+"Session").getItemsSketch().update(Dimension.setDimensionValue(entity,key),entity.getCommon_sessions());
stringItemsSketchHashMap.get(key+"Packets").getItemsSketch().update(Dimension.setDimensionValue(entity,key),entity.getCommon_c2s_pkt_num()+entity.getCommon_s2c_pkt_num());
stringItemsSketchHashMap.get(key+"Bytes").getItemsSketch().update(Dimension.setDimensionValue(entity,key),entity.getCommon_c2s_byte_num()+entity.getCommon_s2c_byte_num());
CollectionValue sessionsCollectionValue = stringItemsSketchHashMap.get(key+"Session").getStringCollectionValueHashMap().get(Dimension.setDimensionValue(entity,key));//从key获取集合
CollectionValue packetsCollectionValue = stringItemsSketchHashMap.get(key+"Packets").getStringCollectionValueHashMap().get(Dimension.setDimensionValue(entity,key));//从key获取集合
CollectionValue bytesCollectionValue = stringItemsSketchHashMap.get(key+"Bytes").getStringCollectionValueHashMap().get(Dimension.setDimensionValue(entity,key));//从key获取集合
if (sessionsCollectionValue==null){
sessionsCollectionValue = new CollectionValue(entity,"sessions",key);
sessionsCollectionValue.setSource(entity.getCommon_client_ip());
packetsCollectionValue = new CollectionValue(entity,"packets",key);
packetsCollectionValue.setSource(entity.getCommon_client_ip());
bytesCollectionValue = new CollectionValue(entity,"bytes",key);
bytesCollectionValue.setSource(entity.getCommon_client_ip());
stringItemsSketchHashMap.get(key+"Session").getStringCollectionValueHashMap().put(Dimension.setDimensionValue(entity,key),sessionsCollectionValue);
stringItemsSketchHashMap.get(key+"Packets").getStringCollectionValueHashMap().put(Dimension.setDimensionValue(entity,key),packetsCollectionValue);
stringItemsSketchHashMap.get(key+"Bytes").getStringCollectionValueHashMap().put(Dimension.setDimensionValue(entity,key),bytesCollectionValue);
}else {//做加和
sessionsCollectionValue.getCollectionValue(sessionsCollectionValue,entity);
packetsCollectionValue.getCollectionValue(packetsCollectionValue,entity);
bytesCollectionValue.getCollectionValue(bytesCollectionValue,entity);
stringItemsSketchHashMap.get(key+"Session").getStringCollectionValueHashMap().put(Dimension.setDimensionValue(entity,key),sessionsCollectionValue);
stringItemsSketchHashMap.get(key+"Packets").getStringCollectionValueHashMap().put(Dimension.setDimensionValue(entity,key),packetsCollectionValue);
stringItemsSketchHashMap.get(key+"Bytes").getStringCollectionValueHashMap().put(Dimension.setDimensionValue(entity,key),bytesCollectionValue);
}
}
return stringItemsSketchHashMap;
}
@Override
public HashMap<String, DimensionItemsSketch> getResult(HashMap<String, DimensionItemsSketch> stringItemsSketchHashMap) {
return stringItemsSketchHashMap;
}
@Override
public HashMap<String, DimensionItemsSketch> merge(HashMap<String, DimensionItemsSketch> stringDimensionItemsSketchHashMap, HashMap<String, DimensionItemsSketch> acc1) {
return null;
}
}

View File

@@ -0,0 +1,107 @@
package com.galaxy.tsg.function;
import com.galaxy.tsg.pojo.TopUrlEntity;
import com.galaxy.tsg.pojo.UrlEntity;
import org.apache.datasketches.frequencies.ItemsSketch;
import org.apache.flink.api.common.functions.AggregateFunction;
import java.util.HashMap;
/**
* @author fy
* @version 1.0
* @date 2023/2/14 17:29
*
*Session_record top10000 21个窗口计算
*/
public class DatasketchForUrlAggregate2 implements AggregateFunction<UrlEntity, HashMap<String,DimensionItemsSketch>, HashMap<String,DimensionItemsSketch>> {
@Override
public HashMap<String, DimensionItemsSketch> createAccumulator() {
return new HashMap<String, DimensionItemsSketch>(1048576);
}
@Override
public HashMap<String, DimensionItemsSketch> add(UrlEntity urlEntity, HashMap<String, DimensionItemsSketch> stringItemsSketchHashMap) {
// String dimension = cnRecordLog.getCommon_client_ip();//维度
// System.out.println(dimension);
if(stringItemsSketchHashMap.isEmpty()) {
ItemsSketch<String> urlSessionItemsSketch = new ItemsSketch<>(1048576);//新建
urlSessionItemsSketch.update(Dimension.setUrlDimension(urlEntity),urlEntity.getCommon_sessions());
TopUrlEntity topUrlEntity = new TopUrlEntity();
topUrlEntity.setSession_num(urlEntity.getCommon_sessions());
topUrlEntity.setStat_time(System.currentTimeMillis() / 1000);
topUrlEntity.setUrl(urlEntity.getHttp_url());
topUrlEntity.setVsys_id(urlEntity.getCommon_vsys_id());
HashMap<String,TopUrlEntity> stringTopUrlEntityHashMap = new HashMap<>();
stringTopUrlEntityHashMap.put(Dimension.setUrlDimension(urlEntity),topUrlEntity);
DimensionItemsSketch urlSessionDimensionItemsSketch = new DimensionItemsSketch(stringTopUrlEntityHashMap,Dimension.URL,urlSessionItemsSketch);
stringItemsSketchHashMap.put("urlSession", urlSessionDimensionItemsSketch);
}else {
stringItemsSketchHashMap.get("urlSession").getItemsSketch().update(Dimension.setUrlDimension(urlEntity),urlEntity.getCommon_sessions());
TopUrlEntity urlSession = stringItemsSketchHashMap.get("urlSession").getStringTopUrlEntityHashMap().get(Dimension.setUrlDimension(urlEntity));//从key获取集合
if (urlSession==null){
TopUrlEntity topUrlEntity = new TopUrlEntity();
topUrlEntity.setSession_num(urlEntity.getCommon_sessions());
topUrlEntity.setStat_time(System.currentTimeMillis() / 1000);
topUrlEntity.setUrl(urlEntity.getHttp_url());
topUrlEntity.setVsys_id(urlEntity.getCommon_vsys_id());
stringItemsSketchHashMap.get("urlSession").getStringTopUrlEntityHashMap().put(Dimension.setUrlDimension(urlEntity),urlSession);
}else {//做加和
urlSession.setSession_num(urlSession.getSession_num()+urlEntity.getCommon_sessions());
stringItemsSketchHashMap.get("urlSession").getStringTopUrlEntityHashMap().put(Dimension.setUrlDimension(urlEntity),urlSession);
}
}
return stringItemsSketchHashMap;
}
@Override
public HashMap<String, DimensionItemsSketch> getResult(HashMap<String, DimensionItemsSketch> stringItemsSketchHashMap) {
return stringItemsSketchHashMap;
}
@Override
public HashMap<String, DimensionItemsSketch> merge(HashMap<String, DimensionItemsSketch> stringItemsSketchHashMap, HashMap<String, DimensionItemsSketch> acc1) {
return null;
}
}

View File

@@ -1,6 +1,5 @@
package com.galaxy.tsg.function;
import com.alibaba.fastjson.JSONObject;
import com.galaxy.tsg.pojo.*;
import org.apache.datasketches.frequencies.ErrorType;
import org.apache.datasketches.frequencies.ItemsSketch;
@@ -53,16 +52,16 @@ public class DatasketchMetricsCalculate extends ProcessWindowFunction<
for (Entity entity:iterable){
//处理session
sessionItemsSketch.update(Dimension.setDimension(entity,key),entity.getCommon_sessions());
sessionResultEntityHashMap.put(Dimension.setDimension(entity,key),enrichessionResult(context.window().getEnd() / 1000, entity));
sessionItemsSketch.update(Dimension.setDimensionValue(entity,key),entity.getCommon_sessions());
sessionResultEntityHashMap.put(Dimension.setDimensionValue(entity,key),enrichessionResult(context.window().getEnd() / 1000, entity));
//处理pkt
pktItemsSketch.update(Dimension.setDimension(entity,key),entity.getCommon_c2s_pkt_num()+entity.getCommon_s2c_pkt_num());
packetResultEntityHashMap.put(Dimension.setDimension(entity,key),enrichPacketResult(context.window().getEnd() / 1000, entity));
pktItemsSketch.update(Dimension.setDimensionValue(entity,key),entity.getCommon_c2s_pkt_num()+entity.getCommon_s2c_pkt_num());
packetResultEntityHashMap.put(Dimension.setDimensionValue(entity,key),enrichPacketResult(context.window().getEnd() / 1000, entity));
//处理byte
byteItemsSketch.update(Dimension.setDimension(entity,key),entity.getCommon_c2s_byte_num()+entity.getCommon_s2c_byte_num());
byteResultEntityHashMap.put(Dimension.setDimension(entity,key),enrichByteResult(context.window().getEnd() / 1000, entity));
byteItemsSketch.update(Dimension.setDimensionValue(entity,key),entity.getCommon_c2s_byte_num()+entity.getCommon_s2c_byte_num());
byteResultEntityHashMap.put(Dimension.setDimensionValue(entity,key),enrichByteResult(context.window().getEnd() / 1000, entity));
}

View File

@@ -79,7 +79,7 @@ public class Dimension {
public static String setDimension(Entity entity,String key){
public static String setDimensionValue(Entity entity, String key){
String dimension = "";
@@ -120,6 +120,44 @@ public class Dimension {
public static String setDimensionTag(String key){
String dimension = "";
switch (key) {
case "common_client_ip":
dimension = CLIENTIP;
break;
case "common_server_ip":
dimension = SERVERIP;
break;
case "common_internal_ip":
dimension = INTERNALIP;
break;
case "common_external_ip":
dimension = EXTERNALIP;
break;
case "http_domain":
dimension = DOMAIN;
break;
case "common_subscriber_id":
dimension = SUBSCRIBERID;
break;
case "common_app_label":
dimension = APPLABEL;
break;
default:
}
return dimension;
}

View File

@@ -0,0 +1,89 @@
package com.galaxy.tsg.function;
import com.alibaba.fastjson.JSONObject;
import com.galaxy.tsg.pojo.TopUrlEntity;
import org.apache.datasketches.frequencies.ErrorType;
import org.apache.datasketches.frequencies.ItemsSketch;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import static com.galaxy.tsg.config.commonConfig.TOP_LIMIT;
/**
* @author fy
* @version 1.0
* @date 2023/3/7 15:50
*/
public class UserCountWindowResult6 extends ProcessWindowFunction<HashMap<String, DimensionItemsSketch>, String, Tuple4<String, Long, String, String>,TimeWindow> {
@Override
public void process(Tuple4<String, Long, String, String> stringLongStringStringTuple4, Context context, Iterable<HashMap<String, DimensionItemsSketch>> iterable, Collector<String> collector) throws Exception {
HashMap<String, DimensionItemsSketch> dataHashMap = iterable.iterator().next();
// System.out.println(dataHashMap.toString());
Set<Map.Entry<String, DimensionItemsSketch>> entries = dataHashMap.entrySet();
for (Map.Entry<String, DimensionItemsSketch> entry : entries) {
// System.out.println(entry.getKey()+"");
// stringBuilder.append(entry.getKey()+"\n");
ItemsSketch.Row<String>[] items = entry.getValue().getItemsSketch().getFrequentItems(ErrorType.NO_FALSE_POSITIVES);
for (int i = 0; i < items.length; i++) {
// String resultStr = "No." + (i + 1) + " "
// + "ip:" + items[i].getItem() + " "
// + " Est:" + items[i].getEstimate()
// + " UB:" + items[i].getUpperBound()
// + " LB:" + items[i].getLowerBound();
String jsonStr = "";
if (!entry.getKey().equals("urlSession")) {
CollectionValue collectionValue = entry.getValue().getStringCollectionValueHashMap().get(items[i].getItem());
collectionValue.setStat_time(context.window().getEnd() / 1000);
jsonStr = JSONObject.toJSONString(collectionValue);
} else {
TopUrlEntity topUrlEntity = entry.getValue().getStringTopUrlEntityHashMap().get(items[i].getItem());
topUrlEntity.setStat_time(context.window().getEnd() / 1000);
jsonStr = JSONObject.toJSONString(topUrlEntity);
}
collector.collect(jsonStr);
// String item = items[i].toString();
// stringBuilder.append(resultStr);
// stringBuilder.append("\n");
if (i == TOP_LIMIT)//够条数就结束
break;
}
}
}
}

View File

@@ -0,0 +1,87 @@
package com.galaxy.tsg.function;
import com.alibaba.fastjson.JSONObject;
import com.galaxy.tsg.pojo.TopUrlEntity;
import org.apache.datasketches.frequencies.ErrorType;
import org.apache.datasketches.frequencies.ItemsSketch;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import static com.galaxy.tsg.config.commonConfig.TOP_LIMIT;
/**
* @author fy
* @version 1.0
* @date 2023/3/7 15:50
*/
public class UserCountWindowResult7 extends ProcessWindowFunction<HashMap<String, DimensionItemsSketch>, String, Tuple2<String, Long>,TimeWindow> {
@Override
public void process(Tuple2<String, Long> stringLongTuple2, Context context, Iterable<HashMap<String, DimensionItemsSketch>> iterable, Collector<String> collector) throws Exception {
HashMap<String, DimensionItemsSketch> dataHashMap = iterable.iterator().next();
// System.out.println(dataHashMap.toString());
Set<Map.Entry<String, DimensionItemsSketch>> entries = dataHashMap.entrySet();
for (Map.Entry<String, DimensionItemsSketch> entry : entries) {
// System.out.println(entry.getKey()+"");
// stringBuilder.append(entry.getKey()+"\n");
ItemsSketch.Row<String>[] items = entry.getValue().getItemsSketch().getFrequentItems(ErrorType.NO_FALSE_POSITIVES);
for (int i = 0; i < items.length; i++) {
// String resultStr = "No." + (i + 1) + " "
// + "ip:" + items[i].getItem() + " "
// + " Est:" + items[i].getEstimate()
// + " UB:" + items[i].getUpperBound()
// + " LB:" + items[i].getLowerBound();
String jsonStr = "";
if (!entry.getKey().equals("urlSession")) {
CollectionValue collectionValue = entry.getValue().getStringCollectionValueHashMap().get(items[i].getItem());
collectionValue.setStat_time(context.window().getEnd() / 1000);
jsonStr = JSONObject.toJSONString(collectionValue);
} else {
TopUrlEntity topUrlEntity = entry.getValue().getStringTopUrlEntityHashMap().get(items[i].getItem());
topUrlEntity.setStat_time(context.window().getEnd() / 1000);
jsonStr = JSONObject.toJSONString(topUrlEntity);
}
collector.collect(jsonStr);
// String item = items[i].toString();
// stringBuilder.append(resultStr);
// stringBuilder.append("\n");
if (i == TOP_LIMIT)//够条数就结束
break;
}
}
}
}