package com.zdjizhi.etl; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.fasterxml.jackson.databind.JavaType; import com.zdjizhi.common.CommonConfig; import com.zdjizhi.common.DosSketchLog; import com.zdjizhi.source.DosSketchSource; import com.zdjizhi.utils.FlinkEnvironmentUtils; import com.zdjizhi.utils.JsonMapper; import com.zdjizhi.utils.StringUtil; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.util.Collector; import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; /** * @author wlh */ public class ParseSketchLog { // private static Logger logger = LoggerFactory.getLogger(ParseSketchLog.class); private static final Log logger = LogFactory.get(); private static JsonMapper jsonMapperInstance = JsonMapper.getInstance(); private static JavaType hashmapJsonType = jsonMapperInstance.createCollectionType(HashMap.class, String.class, Object.class); private static JavaType listType = jsonMapperInstance.createCollectionType(ArrayList.class, HashMap.class); public static SingleOutputStreamOperator getSketchSource(){ return flatSketchSource().assignTimestampsAndWatermarks(createWatermarkStrategy()); } private static SingleOutputStreamOperator flatSketchSource(){ return DosSketchSource.createDosSketchSource().flatMap(new FlatSketchLog()); } private static WatermarkStrategy createWatermarkStrategy(){ return WatermarkStrategy .forBoundedOutOfOrderness(Duration.ofSeconds(CommonConfig.FLINK_WATERMARK_MAX_ORDERNESS)) .withTimestampAssigner((event, timestamp) -> event.getSketch_start_time() * 1000); } private static class FlatSketchLog implements FlatMapFunction { @Override public void flatMap(String s, Collector collector) { try { if (StringUtil.isNotBlank(s)){ HashMap sketchSource = jsonMapperInstance.fromJson(s, hashmapJsonType); long sketchStartTime = Long.parseLong(sketchSource.get("sketch_start_time").toString()); long sketchDuration = Long.parseLong(sketchSource.get("sketch_duration").toString()); String attackType = sketchSource.get("attack_type").toString(); int vsysId = Integer.parseInt(sketchSource.getOrDefault("common_vsys_id", 1).toString()); ArrayList> reportIpList = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(sketchSource.get("report_ip_list")), listType); for (HashMap obj : reportIpList) { DosSketchLog dosSketchLog = new DosSketchLog(); dosSketchLog.setSketch_start_time(sketchStartTime); dosSketchLog.setSketch_duration(sketchDuration); dosSketchLog.setAttack_type(attackType); dosSketchLog.setVsys_id(vsysId); String sourceIp = obj.get("source_ip").toString(); String destinationIp = obj.get("destination_ip").toString(); long sketchSessions = Long.parseLong(obj.get("sketch_sessions").toString()); long sketchPackets = Long.parseLong(obj.get("sketch_packets").toString()); long sketchBytes = Long.parseLong(obj.get("sketch_bytes").toString()); dosSketchLog.setSource_ip(sourceIp); dosSketchLog.setDestination_ip(destinationIp); dosSketchLog.setSketch_sessions(sketchSessions); dosSketchLog.setSketch_packets(sketchPackets); dosSketchLog.setSketch_bytes(sketchBytes); collector.collect(dosSketchLog); logger.debug("数据解析成功:{}",dosSketchLog.toString()); } } } catch (Exception e) { logger.error("数据解析错误:{} \n{}",s,e); } } } public static void main(String[] args) throws Exception { flatSketchSource().print(); FlinkEnvironmentUtils.streamExeEnv.execute(); } }