TSG-17517数据延迟大于十分钟

This commit is contained in:
wangkuan
2023-11-02 19:21:09 +08:00
parent dfc46ab931
commit 6e683191c2
5 changed files with 40 additions and 156 deletions

View File

@@ -6,7 +6,7 @@
<groupId>com.galaxy.tsg</groupId>
<artifactId>topn-metrics-job</artifactId>
<version>23-09-14</version>
<version>23-11-01</version>
<repositories>
<repository>
@@ -181,7 +181,7 @@
</goals>
<configuration>
<finalName>topn-metrics-job-23-09-14</finalName>
<finalName>topn-metrics-job-23-11-01</finalName>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.

View File

@@ -109,7 +109,7 @@ public class Toptask {
SingleOutputStreamOperator<resultEntity> windowedStream = clientipdStream.keyBy(new groupBySelector("client_ip"))
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.reduce(new metricsAggregationReduce(), new metricsCalculate(TOP_LIMIT, "client_ip")).setParallelism(TASK_PARALLELISM).name("client_ip");;
.reduce(new metricsAggregationReduce(), new metricsCalculate()).setParallelism(TASK_PARALLELISM).name("client_ip");;
DataStream<String> Stream = windowedStream.keyBy(new oneKeySelector())
.process(new topnHotItems(TOP_LIMIT)).setParallelism(ORDERBY_PARALLELISM);
Stream.addSink(getKafkaSink(KAFKA_PRODUCER_TOPIC)).setParallelism(SINK_PARALLELISM);
@@ -127,7 +127,7 @@ public class Toptask {
SingleOutputStreamOperator<resultEntity> windowedStreamForServerIp = serveripdStream.keyBy(new groupBySelector("server_ip"))
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.reduce(new metricsAggregationReduce(), new metricsCalculate(TOP_LIMIT, "server_ip")).setParallelism(TASK_PARALLELISM).name("server_ip");;
.reduce(new metricsAggregationReduce(), new metricsCalculate()).setParallelism(TASK_PARALLELISM).name("server_ip");;
DataStream<String> StreamForServerIp = windowedStreamForServerIp.keyBy(new oneKeySelector())
.process(new topnHotItems(TOP_LIMIT)).setParallelism(ORDERBY_PARALLELISM);
StreamForServerIp.addSink(getKafkaSink(KAFKA_PRODUCER_TOPIC)).setParallelism(SINK_PARALLELISM);
@@ -143,7 +143,7 @@ public class Toptask {
SingleOutputStreamOperator<resultEntity> windowedStreamForInternal = internalStream.keyBy(new groupBySelector("internal_ip"))
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.reduce(new metricsAggregationReduce(), new metricsCalculate(TOP_LIMIT, "internal_ip")).setParallelism(TASK_PARALLELISM).name("internal_ip");;
.reduce(new metricsAggregationReduce(), new metricsCalculate()).setParallelism(TASK_PARALLELISM).name("internal_ip");;
DataStream<String> StreamForInternal = windowedStreamForInternal.keyBy(new oneKeySelector())
.process(new topnHotItems(TOP_LIMIT)).setParallelism(ORDERBY_PARALLELISM);
StreamForInternal.addSink(getKafkaSink(KAFKA_PRODUCER_TOPIC)).setParallelism(SINK_PARALLELISM);
@@ -159,7 +159,7 @@ public class Toptask {
SingleOutputStreamOperator<resultEntity> windowedStreamForExternal = externalStream.keyBy(new groupBySelector("external_ip"))
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.reduce(new metricsAggregationReduce(), new metricsCalculate(TOP_LIMIT, "external_ip")).setParallelism(TASK_PARALLELISM).name("external_ip");;
.reduce(new metricsAggregationReduce(), new metricsCalculate()).setParallelism(TASK_PARALLELISM).name("external_ip");;
DataStream<String> StreamForExternal = windowedStreamForExternal.keyBy(new oneKeySelector())
.process(new topnHotItems(TOP_LIMIT)).setParallelism(ORDERBY_PARALLELISM);
StreamForExternal.addSink(getKafkaSink(KAFKA_PRODUCER_TOPIC)).setParallelism(SINK_PARALLELISM);
@@ -175,7 +175,7 @@ public class Toptask {
SingleOutputStreamOperator<resultEntity> windowedStreamForDomain = domainStream.keyBy(new groupBySelector("server_domain"))
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.reduce(new metricsAggregationReduce(), new metricsCalculate(TOP_LIMIT, "server_domain")).setParallelism(TASK_PARALLELISM).name("server_domain");;
.reduce(new metricsAggregationReduce(), new metricsCalculate()).setParallelism(TASK_PARALLELISM).name("server_domain");;
DataStream<String> StreamForDomain = windowedStreamForDomain.keyBy(new oneKeySelector())
.process(new topnHotItems(TOP_LIMIT)).setParallelism(ORDERBY_PARALLELISM);
StreamForDomain.addSink(getKafkaSink(KAFKA_PRODUCER_TOPIC)).setParallelism(SINK_PARALLELISM);
@@ -190,7 +190,7 @@ public class Toptask {
//common_subscriber_id聚合TOP
SingleOutputStreamOperator<resultEntity> windowedStreamForUser = userStream.keyBy(new groupBySelector("subscriber_id"))
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.reduce(new metricsAggregationReduce(), new metricsCalculate(TOP_LIMIT, "subscriber_id")).setParallelism(TASK_PARALLELISM).name("subscriber_id");;
.reduce(new metricsAggregationReduce(), new metricsCalculate()).setParallelism(TASK_PARALLELISM).name("subscriber_id");;
DataStream<String> StreamForUser = windowedStreamForUser.keyBy(new oneKeySelector())
.process(new topnHotItems(TOP_LIMIT)).setParallelism(ORDERBY_PARALLELISM);
StreamForUser.addSink(getKafkaSink(KAFKA_PRODUCER_TOPIC)).setParallelism(SINK_PARALLELISM);
@@ -204,7 +204,7 @@ public class Toptask {
SingleOutputStreamOperator<resultEntity> windowedStreamForFqdn = fqdnStream.keyBy(new groupBySelector("server_fqdn"))
.window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
.reduce(new metricsAggregationReduce(), new metricsCalculate(TOP_LIMIT, "server_fqdn")).setParallelism(TASK_PARALLELISM).name("server_fqdn");
.reduce(new metricsAggregationReduce(), new metricsCalculate()).setParallelism(TASK_PARALLELISM).name("server_fqdn");
DataStream<String> StreamForFqdn = windowedStreamForFqdn.keyBy(new oneKeySelector())
.process(new topnHotItems(TOP_LIMIT)).setParallelism(ORDERBY_PARALLELISM);
StreamForFqdn.addSink(getKafkaSink(KAFKA_PRODUCER_TOPIC)).setParallelism(SINK_PARALLELISM);

View File

@@ -6,159 +6,44 @@ import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.PriorityQueue;
public class metricsCalculate extends ProcessWindowFunction<
transformEntity, // 输入类型
resultEntity, // 输出类型
Tuple5<String, Long, String, String, String>, // 键类型
TimeWindow> { // 窗口类型
private final int topSize;
private final String key;
// Set<ObjectEntity> allSet = new TreeSet<ObjectEntity>() ;
private PriorityQueue<sessionResultEntity> sessionOrderEntity;
private PriorityQueue<packetResultEntity> packetOrderEntity;
private PriorityQueue<byteResultEntity> byteOrderEntity;
private long windowStartTimestamp;
public metricsCalculate(int i, String key) {
this.key = key;
this.topSize = i;
this.sessionOrderEntity = new PriorityQueue<>();
this.packetOrderEntity = new PriorityQueue<>();
this.byteOrderEntity = new PriorityQueue<>();
}
@Override
public void process(Tuple5<String, Long, String, String, String> s,
Context context,
Iterable<transformEntity> elements, Collector<resultEntity> out) throws Exception {
if (context.window().getStart() == windowStartTimestamp) {
if (elements.iterator().hasNext()) {
transformEntity objectTransformEntity = elements.iterator().next();
resultEntity enSession = new resultEntity();
enSession.setOrder_by("sessions");
enSession.setStat_time(context.window().getStart());
enSession.setSessionResultEntity(enrichessionResult(context.window().getStart(), objectTransformEntity));
out.collect(enSession);
resultEntity enPacket = new resultEntity();
enPacket.setOrder_by("packets");
enPacket.setStat_time(context.window().getStart());
enPacket.setPacketResultEntity(enrichPacketResult(context.window().getStart() , objectTransformEntity));
out.collect(enPacket);
resultEntity enbyte = new resultEntity();
enbyte.setOrder_by("bytes");
enbyte.setStat_time(context.window().getStart());
enbyte.setByteResultEntity(enrichByteResult(context.window().getStart(), objectTransformEntity));
out.collect(enbyte);
if (elements.iterator().hasNext()) {
transformEntity objectTransformEntity = elements.iterator().next();
if (sessionOrderEntity.size() < topSize) {
sessionOrderEntity.add(enrichessionResult(context.window().getStart(), objectTransformEntity));
} else {
if (sessionOrderEntity.peek() != null) {
sessionResultEntity res = sessionOrderEntity.peek();
if (res.getSessions() <= objectTransformEntity.getSessions()) {
sessionOrderEntity.poll();
sessionOrderEntity.add(enrichessionResult(context.window().getStart(), objectTransformEntity));
}
}
}
if (packetOrderEntity.size() < topSize) {
packetOrderEntity.add(enrichPacketResult(context.window().getStart(), objectTransformEntity));
} else {
if (packetOrderEntity.peek() != null) {
packetResultEntity res = packetOrderEntity.peek();
if ((res.getIn_pkts() + res.getOut_pkts()) <= (objectTransformEntity.getIn_pkts() + objectTransformEntity.getOut_pkts())) {
packetOrderEntity.poll();
packetOrderEntity.add(enrichPacketResult(context.window().getStart() , objectTransformEntity));
}
}
}
if (byteOrderEntity.size() < topSize) {
byteOrderEntity.add(enrichByteResult(context.window().getStart(), objectTransformEntity));
} else {
if (byteOrderEntity.peek() != null) {
byteResultEntity res = byteOrderEntity.peek();
if ((res.getIn_bytes() + res.getOut_bytes()) <= (objectTransformEntity.getIn_bytes() + objectTransformEntity.getOut_bytes())) {
byteOrderEntity.poll();
byteOrderEntity.add(enrichByteResult(context.window().getStart(), objectTransformEntity));
}
}
}
}
} else {
for (com.galaxy.tsg.pojo.sessionResultEntity sessionResultEntity : sessionOrderEntity) {
resultEntity en = new resultEntity();
en.setOrder_by("sessions");
en.setStat_time(windowStartTimestamp);
en.setSessionResultEntity(sessionResultEntity);
out.collect(en);
}
sessionOrderEntity.clear();
for (com.galaxy.tsg.pojo.packetResultEntity packetResultEntity : packetOrderEntity) {
resultEntity en = new resultEntity();
en.setOrder_by("packets");
en.setStat_time(windowStartTimestamp);
en.setPacketResultEntity(packetResultEntity);
out.collect(en);
}
packetOrderEntity.clear();
for (com.galaxy.tsg.pojo.byteResultEntity byteResultEntity : byteOrderEntity) {
resultEntity en = new resultEntity();
en.setOrder_by("bytes");
en.setStat_time(windowStartTimestamp);
en.setByteResultEntity(byteResultEntity);
out.collect(en);
}
byteOrderEntity.clear();
windowStartTimestamp = context.window().getStart();
if (elements.iterator().hasNext()) {
transformEntity objectTransformEntity = elements.iterator().next();
if (sessionOrderEntity.size() < topSize) {
sessionOrderEntity.add(enrichessionResult(context.window().getStart(), objectTransformEntity));
} else {
if (sessionOrderEntity.peek() != null) {
sessionResultEntity res = sessionOrderEntity.peek();
if (res.getSessions() <= objectTransformEntity.getSessions()) {
sessionOrderEntity.poll();
sessionOrderEntity.add(enrichessionResult(context.window().getStart(), objectTransformEntity));
}
}
}
if (packetOrderEntity.size() < topSize) {
packetOrderEntity.add(enrichPacketResult(context.window().getStart(), objectTransformEntity));
} else {
if (packetOrderEntity.peek() != null) {
packetResultEntity res = packetOrderEntity.peek();
if ((res.getIn_pkts() + res.getOut_pkts()) <= (objectTransformEntity.getIn_pkts() + objectTransformEntity.getOut_pkts())) {
packetOrderEntity.poll();
packetOrderEntity.add(enrichPacketResult(context.window().getStart() , objectTransformEntity));
}
}
}
if (byteOrderEntity.size() < topSize) {
byteOrderEntity.add(enrichByteResult(context.window().getStart(), objectTransformEntity));
} else {
if (byteOrderEntity.peek() != null) {
byteResultEntity res = byteOrderEntity.peek();
if ((res.getIn_bytes() + res.getOut_bytes()) <= (objectTransformEntity.getIn_bytes() + objectTransformEntity.getOut_bytes())) {
byteOrderEntity.poll();
byteOrderEntity.add(enrichByteResult(context.window().getStart(), objectTransformEntity));
}
}
}
}
}
}
public byteResultEntity enrichByteResult(Long time, transformEntity objectTransformEntity) {

View File

@@ -13,7 +13,6 @@ import java.util.PriorityQueue;
public class topnHotItems extends KeyedProcessFunction<Tuple1<String>, resultEntity, String> {
private final int topSize;
// Set<ObjectEntity> allSet = new TreeSet<ObjectEntity>() ;
private PriorityQueue<sessionResultEntity> sessionOrderEntity ;
private PriorityQueue<packetResultEntity> packetOrderEntity ;
private PriorityQueue<byteResultEntity> byteOrderEntity ;
@@ -89,7 +88,7 @@ public class topnHotItems extends KeyedProcessFunction<Tuple1<String>, resultEn
}
//注册 windowEnd+1 的 EventTime Timer, 当触发时,说明收集好了所有 windowEnd的商品数据
//注册 windowEnd+1 的 EventTime Timer, 当触发时,说明收集好了所有 数据
context.timerService().registerEventTimeTimer(objectEntity.getStat_time() + 1);
}

View File

@@ -1,18 +1,18 @@
#--------------------------------Kafka<6B><61><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ϣ------------------------------#
#kafka<6B>ĵ<EFBFBD>ַ<EFBFBD><D6B7>Ϣ
kafka.consumer.broker=192.168.44.12:9094
kafka.consumer.broker=192.168.44.11:9092
#kafka <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>topic
kafka.consumer.topic=SESSION-RECORD-COMPLETED
#<23><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
kafka.consumer.group.id=topn-metrics-job-20230501
kafka.consumer.group.id=topn-metrics-job-20231101-t1
#--------------------------------Kafka<6B><61><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ϣ------------------------------#
#kafka<6B>ĵ<EFBFBD>ַ<EFBFBD><D6B7>Ϣ
kafka_producer_broker=192.168.44.12:9094
kafka_producer_broker=192.168.44.12:9092
kafka.producer.topic=TRAFFIC-TOP-METRICS
kafka.producer.topic=TRAFFIC-TOP-METRIC
#--------------------------------topology<67><79><EFBFBD><EFBFBD>------------------------------#
#<23><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
@@ -50,7 +50,7 @@ kafka.consumer.session.timeout.ms=60000
kafka.consumer.max.partition.fetch.bytes=31457280
#kafka<6B><61><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ƿ<EFBFBD><C7B7><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ȫ<EFBFBD><C8AB>֤ 0<><30><EFBFBD><EFBFBD><EFBFBD><EFBFBD> 1SSL 2 SASL
kafka.consumer.security=2
kafka.consumer.security=0
#kafka SASL<53><4C>֤<EFBFBD>û<EFBFBD><C3BB><EFBFBD>
kafka.consumer.user=admin
@@ -62,7 +62,7 @@ kafka.consumer.pin=galaxy2019
tools.consumer.library=/home/bigdata/topology/dat/
#kafka<6B><61><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ƿ<EFBFBD><C7B7><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ȫ<EFBFBD><C8AB>֤ 0<><30><EFBFBD><EFBFBD><EFBFBD><EFBFBD> 1SSL 2 SASL
kafka.producer.security=2
kafka.producer.security=0
#kafka SASL<53><4C>֤<EFBFBD>û<EFBFBD><C3BB><EFBFBD>
kafka.producer.user=admin
@@ -74,7 +74,7 @@ kafka.producer.pin=galaxy2019
tools.producer.library=/home/bigdata/topology/dat/
#producer<65><72><EFBFBD>ԵĴ<D4B5><C4B4><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
kafka.producer.retries=0
kafka.producer.retries=1
#<23><><EFBFBD>ĺ<EFBFBD><C4BA><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>˵һ<CBB5><D2BB>Batch<63><68><EFBFBD><EFBFBD><EFBFBD><EFBFBD>֮<EFBFBD><D6AE><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ã<EFBFBD><C3A3><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Batch<63><68>û<EFBFBD><C3BB>д<EFBFBD><D0B4><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ͳ<EFBFBD>ȥ<EFBFBD><C8A5>
kafka.producer.linger.ms=1