diff --git a/src/main/java/com/galaxy/tsg/function/metricsAggregation.java b/src/main/java/com/galaxy/tsg/function/metricsAggregation.java new file mode 100644 index 0000000..dd64388 --- /dev/null +++ b/src/main/java/com/galaxy/tsg/function/metricsAggregation.java @@ -0,0 +1,57 @@ +package com.galaxy.tsg.function; + +import com.galaxy.tsg.pojo.Entity; +import org.apache.flink.api.common.functions.AggregateFunction; +import org.apache.flink.api.java.tuple.Tuple1; + +import java.util.HashMap; + +public class metricsAggregation implements AggregateFunction>, Tuple1>> { + + private final int topSize; + + public metricsAggregation(int i) { + + this.topSize = i; + + } + + @Override + public Tuple1> createAccumulator() { + + + return Tuple1.of(new HashMap()); + } + + @Override + public Tuple1> add(Entity value, Tuple1> accumulator) { + + + if(accumulator.f0.containsKey(value.getKey_by())) { + + Entity value1= accumulator.f0.get(value.getKey_by()); + value1.setCommon_c2s_pkt_num(value1.getCommon_c2s_pkt_num() + value.getCommon_c2s_pkt_num()); + value1.setCommon_s2c_pkt_num(value1.getCommon_s2c_pkt_num() + value.getCommon_s2c_pkt_num()); + value1.setCommon_c2s_byte_num(value1.getCommon_c2s_byte_num() + value.getCommon_c2s_byte_num()); + value1.setCommon_s2c_byte_num(value1.getCommon_s2c_byte_num() + value.getCommon_s2c_byte_num()); + value1.setCommon_sessions(value1.getCommon_sessions() + value.getCommon_sessions()); + } + else { + if(accumulator.f0.size()> getResult(Tuple1> accumulator) { + return accumulator; + } + + @Override + public Tuple1> merge(Tuple1> a, Tuple1> b) { + return null; + } +} diff --git a/src/main/java/com/galaxy/tsg/function/metricsCalculateTest.java b/src/main/java/com/galaxy/tsg/function/metricsCalculateTest.java new file mode 100644 index 0000000..417fd60 --- /dev/null +++ b/src/main/java/com/galaxy/tsg/function/metricsCalculateTest.java @@ -0,0 +1,110 @@ +package com.galaxy.tsg.function; + +import com.galaxy.tsg.pojo.*; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.Collector; + +import java.util.HashMap; +import java.util.Map; +import java.util.PriorityQueue; + +public class metricsCalculateTest extends ProcessWindowFunction< + Tuple1>, // 输入类型 + ResultEntity, // 输出类型 + Tuple4, // 键类型 + TimeWindow> { // 窗口类型 + private final int topSize; + private final String key; + // Set allSet = new TreeSet() ; + private PriorityQueue sessionOrderEntity ; + private PriorityQueue packetOrderEntity ; + private PriorityQueue byteOrderEntity ; + + public metricsCalculateTest(int i, String key) { + this.key = key; + this.topSize = i; + this.sessionOrderEntity=new PriorityQueue<>(); + this.packetOrderEntity=new PriorityQueue<>(); + this.byteOrderEntity=new PriorityQueue<>(); + } + + @Override + public void process(Tuple4 s, + Context context, + Iterable>> elements, Collector out) throws Exception { + + + + + + for(Tuple1> objectEntity : elements) { + + for (Map.Entry entry: objectEntity.f0.entrySet()){ + + + + ByteResultEntity testEntity = enrichByteResult(context.window().getEnd() / 1000,entry.getValue()); + + ResultEntity resultEntity = new ResultEntity(); + resultEntity.setOrder_by(""); + resultEntity.setByteResultEntity(testEntity); + resultEntity.setStat_time(context.window().getEnd() / 1000); + out.collect(resultEntity); + + + } + + } + + + } + + public ByteResultEntity enrichByteResult(Long time,Entity objectEntity) { + ByteResultEntity en = new ByteResultEntity(); + en.setVsys_id(objectEntity.getCommon_vsys_id()); + en.setStat_time(time); + en.setSource(objectEntity.getCommon_client_ip()); + en.setSession_num(objectEntity.getCommon_sessions()); + en.setOrder_by(""); + en.setDevice_group(objectEntity.getCommon_device_group()); + en.setData_center(objectEntity.getCommon_data_center()); + en.setS2c_pkt_num(objectEntity.getCommon_s2c_pkt_num()); + en.setC2s_pkt_num(objectEntity.getCommon_c2s_pkt_num()); + en.setS2c_byte_num(objectEntity.getCommon_s2c_byte_num()); + en.setC2s_byte_num(objectEntity.getCommon_c2s_byte_num()); + switch (key) { + case "common_client_ip": + en.setSource(objectEntity.getCommon_client_ip()); + break; + case "common_server_ip": + en.setDestination(objectEntity.getCommon_server_ip()); + break; + case "common_internal_ip": + en.setSource(objectEntity.getCommon_internal_ip()); + break; + case "common_external_ip": + en.setDestination(objectEntity.getCommon_external_ip()); + break; + case "http_domain": + en.setDomain(objectEntity.getHttp_domain()); + break; + + case "common_subscriber_id": + en.setSubscriber_id(objectEntity.getCommon_subscriber_id()); + break; + + case "common_app_label": + en.setApp_name(objectEntity.getCommon_app_label()); + break; + + default: + + } + return en; + + } + +} \ No newline at end of file