修改计算速率方式,使用session总数除以时间窗口

This commit is contained in:
wanglihui
2021-09-13 09:46:02 +08:00
parent 81f6499458
commit 4f8807dfa1

View File

@@ -80,7 +80,9 @@ public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosS
}
}
String sourceIpList = StringUtils.join(sourceIpSet, ",");
return Tuple6.of(sessions/cnt/duration,packets/cnt/duration,bytes/cnt/duration,sourceIpList,startTime,duration);
// return Tuple6.of(sessions/cnt/duration,packets/cnt/duration,bytes/cnt/duration,sourceIpList,startTime,duration);
return Tuple6.of(sessions/CommonConfig.FLINK_WINDOW_MAX_TIME,packets/CommonConfig.FLINK_WINDOW_MAX_TIME,
bytes*8/CommonConfig.FLINK_WINDOW_MAX_TIME,sourceIpList,startTime,duration);
}catch (Exception e){
logger.error("聚合中间结果集失败 {}",e);
}