package com.zdjizhi.function; import com.fasterxml.jackson.databind.JavaType; import com.zdjizhi.common.CustomFile; import com.zdjizhi.common.DosSketchLog; import com.zdjizhi.etl.ParseSketchLog; import com.zdjizhi.utils.IpUtils; import com.zdjizhi.utils.JsonMapper; import com.zdjizhi.utils.StringUtil; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class BroadcastProcessFunc extends BroadcastProcessFunction, DosSketchLog> { private static Logger logger = LoggerFactory.getLogger(ParseSketchLog.class); private static JsonMapper jsonMapperInstance = JsonMapper.getInstance(); private static JavaType hashmapJsonType = jsonMapperInstance.createCollectionType(HashMap.class, String.class, Object.class); private static JavaType listType = jsonMapperInstance.createCollectionType(ArrayList.class, HashMap.class); @Override public void open(Configuration parameters) throws Exception { super.open(parameters); System.out.println("begin init"); IpUtils.loadIpLook(); System.out.println("init over"); } @Override public void processElement(String value, ReadOnlyContext ctx, Collector out) throws Exception { try { if (StringUtil.isNotBlank(value)){ HashMap sketchSource = jsonMapperInstance.fromJson(value, hashmapJsonType); long sketchStartTime = Long.parseLong(sketchSource.get("sketch_start_time").toString()); long sketchDuration = Long.parseLong(sketchSource.get("sketch_duration").toString()); String attackType = sketchSource.get("attack_type").toString(); ArrayList> reportIpList = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(sketchSource.get("report_ip_list")), listType); for (HashMap obj : reportIpList) { DosSketchLog dosSketchLog = new DosSketchLog(); dosSketchLog.setSketch_start_time(sketchStartTime); dosSketchLog.setSketch_duration(sketchDuration); dosSketchLog.setAttack_type(attackType); String sourceIp = obj.get("source_ip").toString(); String destinationIp = obj.get("destination_ip").toString(); long sketchSessions = Long.parseLong(obj.get("sketch_sessions").toString()); long sketchPackets = Long.parseLong(obj.get("sketch_packets").toString()); long sketchBytes = Long.parseLong(obj.get("sketch_bytes").toString()); dosSketchLog.setSource_ip(sourceIp); dosSketchLog.setDestination_ip(destinationIp); dosSketchLog.setSketch_sessions(sketchSessions); dosSketchLog.setSketch_packets(sketchPackets); dosSketchLog.setSketch_bytes(sketchBytes); out.collect(dosSketchLog); logger.debug("数据解析成功:{}",dosSketchLog.toString()); } } } catch (Exception e) { logger.error("数据解析错误:{} \n{}",value,e); } } @Override public void processBroadcastElement(Map value, Context ctx, Collector out) throws Exception { IpUtils.updateIpLook(value); } }