fix:recv_time fill value error
This commit is contained in:
@@ -6,7 +6,7 @@ import com.zdjizhi.common.FlowWriteConfig;
|
||||
import com.zdjizhi.common.DosSketchLog;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.flink.api.java.tuple.Tuple3;
|
||||
import org.apache.flink.api.java.tuple.Tuple6;
|
||||
import org.apache.flink.api.java.tuple.Tuple7;
|
||||
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
|
||||
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
|
||||
import org.apache.flink.util.Collector;
|
||||
@@ -44,7 +44,7 @@ public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosS
|
||||
private DosSketchLog getMiddleResult(Tuple3<String,String,Integer> keys,Iterable<DosSketchLog> elements){
|
||||
|
||||
DosSketchLog midResuleLog = new DosSketchLog();
|
||||
Tuple6<Long, Long, Long,String,Long,Long> values = sketchAggregate(elements);
|
||||
Tuple7<Long, Long, Long,String,Long,Long,Long> values = sketchAggregate(elements);
|
||||
try {
|
||||
if (values != null){
|
||||
midResuleLog.setAttack_type(keys.f0);
|
||||
@@ -56,6 +56,7 @@ public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosS
|
||||
midResuleLog.setSketch_sessions(values.f0);
|
||||
midResuleLog.setSketch_packets(values.f1);
|
||||
midResuleLog.setSketch_bytes(values.f2);
|
||||
midResuleLog.setCommon_recv_time(values.f6);
|
||||
return midResuleLog;
|
||||
}
|
||||
} catch (Exception e){
|
||||
@@ -64,16 +65,23 @@ public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosS
|
||||
return null;
|
||||
}
|
||||
|
||||
private Tuple6<Long, Long, Long,String,Long,Long> sketchAggregate(Iterable<DosSketchLog> elements){
|
||||
private Tuple7<Long, Long, Long,String,Long,Long,Long> sketchAggregate(Iterable<DosSketchLog> elements){
|
||||
long sessions = 0;
|
||||
long packets = 0 ;
|
||||
long bytes = 0;
|
||||
long startTime = System.currentTimeMillis()/1000;
|
||||
long endTime = System.currentTimeMillis()/1000;
|
||||
long duration = 0;
|
||||
long recvtime = 0;
|
||||
HashSet<String> sourceIpSet = new HashSet<>();
|
||||
try {
|
||||
for (DosSketchLog newSketchLog : elements){
|
||||
if (recvtime == 0){
|
||||
recvtime = newSketchLog.getCommon_recv_time();
|
||||
}else if (recvtime > newSketchLog.getCommon_recv_time()){
|
||||
recvtime = newSketchLog.getCommon_recv_time();
|
||||
}
|
||||
System.out.println(newSketchLog.getCommon_recv_time());
|
||||
String sourceIp = newSketchLog.getSource_ip();
|
||||
if (StringUtils.equals(sourceIp,EMPTY_SOURCE_IP_IPV4) || StringUtils.equals(sourceIp,EMPTY_SOURCE_IP_IPV6)){
|
||||
sessions += newSketchLog.getSketch_sessions();
|
||||
@@ -88,9 +96,10 @@ public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosS
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
String sourceIpList = StringUtils.join(sourceIpSet, ",");
|
||||
return Tuple6.of(sessions/ FlowWriteConfig.FLINK_WINDOW_MAX_TIME,packets/ FlowWriteConfig.FLINK_WINDOW_MAX_TIME,
|
||||
bytes*8/ FlowWriteConfig.FLINK_WINDOW_MAX_TIME,sourceIpList,startTime,duration);
|
||||
return Tuple7.of(sessions/ FlowWriteConfig.FLINK_WINDOW_MAX_TIME,packets/ FlowWriteConfig.FLINK_WINDOW_MAX_TIME,
|
||||
bytes*8/ FlowWriteConfig.FLINK_WINDOW_MAX_TIME,sourceIpList,startTime,duration,recvtime);
|
||||
}catch (Exception e){
|
||||
logger.error("聚合中间结果集失败 {}",e);
|
||||
}
|
||||
|
||||
@@ -44,7 +44,7 @@ public class ParseSketchLog {
|
||||
try {
|
||||
if (StringUtil.isNotBlank(s)) {
|
||||
|
||||
final long recv_time = System.currentTimeMillis();
|
||||
final long recv_time = System.currentTimeMillis()/1000;
|
||||
|
||||
HashMap<String, Object> sketchSource = JSONObject.parseObject(s, HashMap.class);
|
||||
long sketchStartTime = Long.parseLong(sketchSource.get("sketch_start_time").toString());
|
||||
|
||||
Reference in New Issue
Block a user