package com.zdjizhi.function; import com.alibaba.fastjson2.JSONObject; import com.geedgenetworks.utils.StringUtil; import com.zdjizhi.common.DosSketchLog; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.HashMap; public class FlatSketchFunction implements FlatMapFunction { private static Logger logger = LoggerFactory.getLogger(FlatSketchFunction.class); @Override public void flatMap(String value, Collector out) { try { if (StringUtil.isNotBlank(value)) { final long recv_time = System.currentTimeMillis()/1000; HashMap sketchSource = JSONObject.parseObject(value, HashMap.class); long sketchStartTime = Long.parseLong(sketchSource.get("sketch_start_time").toString()); long sketchDuration = Long.parseLong(sketchSource.get("sketch_duration").toString()); String attackType = sketchSource.get("attack_type").toString(); int vsysId = Integer.parseInt(sketchSource.getOrDefault("common_vsys_id", 1).toString()); String report_ip_list = JSONObject.toJSONString(sketchSource.get("report_ip_list")); ArrayList> reportIpList = JSONObject.parseObject(report_ip_list, ArrayList.class); for (HashMap obj : reportIpList) { DosSketchLog dosSketchLog = new DosSketchLog(); dosSketchLog.setCommon_recv_time(recv_time); dosSketchLog.setSketch_start_time(sketchStartTime); dosSketchLog.setSketch_duration(sketchDuration); dosSketchLog.setAttack_type(attackType); dosSketchLog.setVsys_id(vsysId); String sourceIp = obj.get("source_ip").toString(); String destinationIp = obj.get("destination_ip").toString(); long sketchSessions = Long.parseLong(obj.get("sketch_sessions").toString()); long sketchPackets = Long.parseLong(obj.get("sketch_packets").toString()); long sketchBytes = Long.parseLong(obj.get("sketch_bytes").toString()); dosSketchLog.setSource_ip(sourceIp); dosSketchLog.setDestination_ip(destinationIp); dosSketchLog.setSketch_sessions(sketchSessions); dosSketchLog.setSketch_packets(sketchPackets); dosSketchLog.setSketch_bytes(sketchBytes); out.collect(dosSketchLog); logger.debug("数据解析成功:{}", dosSketchLog); } } } catch (Exception e) { logger.error("数据解析错误:{} \n{}", value, e); } } }