package com.zdjizhi.utils.functions; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.alibaba.fastjson.JSONObject; import com.zdjizhi.common.JsonProConfig; import com.zdjizhi.common.VoipRelationConfig; import com.zdjizhi.utils.StringUtil; import com.zdjizhi.utils.ip.IPUtils; import com.zdjizhi.utils.json.JsonParseUtil; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.util.HashMap; import java.util.Map; /** * @author qidaijie * @Package com.zdjizhi.utils.functions * @Description: * @date 2021/8/1818:04 */ public class OneSidedWindowFunction extends ProcessAllWindowFunction, TimeWindow> { private static final Log logger = LogFactory.get(); /** * key-sip_call_id;value为sip的具体数据---存放的是SIP未关联的数据(单向流) */ private static HashMap sipOriHmList = new HashMap<>(16); /** * key-rtp拼接的四元组;value为rtp的具体数据---存放的是RTP未关联的数据(单向流) */ private static HashMap rtpOriHmList = new HashMap<>(16); @Override @SuppressWarnings("unchecked") public void process(Context context, Iterable inputs, Collector> out) throws Exception { for (String input : inputs) { if (StringUtil.isNotBlank(input)) { JSONObject object = JSONObject.parseObject(input); String commonSchemaType = object.getString(JsonProConfig.SCHEMA_TYPE); String sipCallId = object.getString(JsonProConfig.SIP_CALL_ID); //1:c2s,2:s2c;3;double int commonStreamDir = object.getInteger(JsonProConfig.STREAM_DIR); /* * 针对SIP日志进行处理 */ if (JsonProConfig.SIP_MARK.equals(commonSchemaType) && StringUtil.isNotBlank(sipCallId)) { if (checkSipCompleteness(object)) { if (commonStreamDir != JsonProConfig.DOUBLE) { putKeyAndMsg(input, sipCallId, sipOriHmList, "SIP", out); } else { separateInnerIp(object, out); } } else { out.collect(new Tuple3<>("", "violation", input)); } } /* * 针对RTP日志进行处理 */ if (JsonProConfig.RTP_MARK.equals(commonSchemaType)) { String rtpIpPort4Key = getFourKey(object.getString(JsonProConfig.CLIENT_IP), object.getInteger(JsonProConfig.CLIENT_PORT), object.getString(JsonProConfig.SERVER_IP), object.getInteger(JsonProConfig.SERVER_PORT)); if (commonStreamDir != JsonProConfig.DOUBLE) { //对rtp单向流进行关联 putKeyAndMsg(input, rtpIpPort4Key, rtpOriHmList, "RTP", out); } else { //RTP双向流,按四元组下发 out.collect(new Tuple3<>(rtpIpPort4Key, "rtp-two", input)); } } } } /* * 定时发送SIP或RTP未关联上数据 */ if (sipOriHmList.size() > 0) { HashMap tmpSipOriHmList = new HashMap(sipOriHmList); sipOriHmList.clear(); for (String sipKey : tmpSipOriHmList.keySet()) { String sipSingleMsg = tmpSipOriHmList.get(sipKey); //sipKey为sip_call_id,未关联成功的sip是不能使用的 out.collect(new Tuple3<>(sipKey, "sip-single", sipSingleMsg)); } } if (rtpOriHmList.size() > 0) { HashMap tmpRtpOriHmList = new HashMap(rtpOriHmList); rtpOriHmList.clear(); for (String rtpKey : tmpRtpOriHmList.keySet()) { String rtpSingleMsg = tmpRtpOriHmList.get(rtpKey); //未关联成功的rtp还可以继续关联,因为有四元组 out.collect(new Tuple3<>(rtpKey, "rtp-single", rtpSingleMsg)); } } } /** * 存放key并关联拼接对应Key */ private static void putKeyAndMsg(String message, String hmStrKey, HashMap hashMapStr, String protocolType, Collector> out) { //和上次存入的数据关联 if (hashMapStr.containsKey(hmStrKey)) { JSONObject jsonCombinObject = new JSONObject(); String[] strArr = new String[2]; String firstMsg = hashMapStr.remove(hmStrKey); JSONObject firstSipOrRtpLog = JSONObject.parseObject(firstMsg); JSONObject secendSipOrRtpLog = JSONObject.parseObject(message); //1:c2s,2:s2c;3;double,1表示firstMsg为请求侧(c2s),合并时以它为准 if (firstSipOrRtpLog.getInteger(JsonProConfig.STREAM_DIR) == 1) { strArr[0] = message; strArr[1] = firstMsg; } else { strArr[0] = firstMsg; strArr[1] = message; } jsonCombinObject.putAll(JSONObject.parseObject(strArr[0])); jsonCombinObject.putAll(JSONObject.parseObject(strArr[1])); String sipTwoMsg = jsonCombinObject.toString(); JSONObject sipOrRtpCombin = JSONObject.parseObject(sipTwoMsg); accumulateMsg(firstSipOrRtpLog, secendSipOrRtpLog, sipOrRtpCombin); sipOrRtpCombin.put(JsonProConfig.STREAM_DIR, JsonProConfig.DOUBLE); if (JsonProConfig.SIP_MARK.equals(protocolType)) { //手动关联SIP后区分内外网IP再下发 separateInnerIp(sipOrRtpCombin, out); } else if (JsonProConfig.RTP_MARK.equals(protocolType)) { //手动关联RTP后按四元组下发 sipOrRtpCombin.put(JsonProConfig.RTP_PCAP_PATH, setRtpPath(firstSipOrRtpLog, secendSipOrRtpLog)); out.collect(new Tuple3<>(hmStrKey, "rtp-two", JSONObject.toJSONString(sipOrRtpCombin))); } } else { hashMapStr.put(hmStrKey, message); } } /** * 区分SIP的内外网IP,此时已经关联完成包含四元组,但未区分内外网IP */ private static void separateInnerIp(JSONObject object, Collector> out) { String sipOriginatorIp = object.getString(JsonProConfig.SIP_ORIGINATOR_IP); String sipResponderIp = object.getString(JsonProConfig.SIP_RESPONDER_IP); int sipOriginatorPort = object.getInteger(JsonProConfig.SIP_ORIGINATOR_PORT); int sipResponderPort = object.getInteger(JsonProConfig.SIP_RESPONDER_PORT); if (IPUtils.isInnerIp(sipOriginatorIp) || IPUtils.isInnerIp(sipResponderIp)) { /** * 按from-ip_from-port_to-ip_to-port */ String sipInnerEmitKey = sipOriginatorIp + VoipRelationConfig.CORRELATION_STR + sipOriginatorPort + VoipRelationConfig.CORRELATION_STR + sipResponderIp + VoipRelationConfig.CORRELATION_STR + sipResponderPort; //包含内网IP的SIP关联后数据 out.collect(new Tuple3<>(sipInnerEmitKey, "sip-in", JSONObject.toJSONString(object))); } else { String sipIpPort4Key = getFourKey(sipOriginatorIp, sipOriginatorPort, sipResponderIp, sipResponderPort); //按照四元组的Key发送到下一个bolt out.collect(new Tuple3<>(sipIpPort4Key, "sip-two", JSONObject.toJSONString(object))); } } /** * 获得四元组key * * @param commonClientIp 客户端IP * @param commonClientPort 客户端端口 * @param commonServerIp 服务端IP * @param commonServerPort 服务端端口 * @return 比较拼接后的四元组 */ private static String getFourKey(String commonClientIp, int commonClientPort, String commonServerIp, int commonServerPort) { String ipPort4Key = ""; int comparePortResult = compareNum(commonClientPort, commonServerPort); /* * 按端口比较 */ switch (comparePortResult) { //common_client_port > commonServerPort case 1: ipPort4Key = commonServerIp + VoipRelationConfig.CORRELATION_STR + commonServerPort + VoipRelationConfig.CORRELATION_STR + commonClientIp + VoipRelationConfig.CORRELATION_STR + commonClientPort; break; //common_client_port < commonServerPort case -1: ipPort4Key = commonClientIp + VoipRelationConfig.CORRELATION_STR + commonClientPort + VoipRelationConfig.CORRELATION_STR + commonServerIp + VoipRelationConfig.CORRELATION_STR + commonServerPort; break; //common_client_port = commonServerPort,开始按照IP比较 case 0: ipPort4Key = compareIp(commonClientIp, commonServerIp, commonClientPort, commonServerPort); break; //port端口值异常 case -2: default: logger.error("compareNum is error," + "common_client_port:" + commonClientPort + "," + "commonServerPort:" + commonServerPort); break; } return ipPort4Key; } /** * 比较IP,并作key的拼接 * * @param commonClientIp * @param commonServerIp * @param commonClientPort * @param commonServerPort * @return */ private static String compareIp(String commonClientIp, String commonServerIp, int commonClientPort, int commonServerPort) { long clientIpNum = IPUtils.ipToLong(commonClientIp); long serverIpNum = IPUtils.ipToLong(commonServerIp); int compareIpResult = compareNum(clientIpNum, serverIpNum); switch (compareIpResult) { //clientIpNum > serverIpNum case 1: return commonServerIp + VoipRelationConfig.CORRELATION_STR + commonServerPort + VoipRelationConfig.CORRELATION_STR + commonClientIp + VoipRelationConfig.CORRELATION_STR + commonClientPort; //clientIpNum < serverIpNum case -1: return commonClientIp + VoipRelationConfig.CORRELATION_STR + commonClientPort + VoipRelationConfig.CORRELATION_STR + commonServerIp + VoipRelationConfig.CORRELATION_STR + commonServerPort; //clientIpNum = serverIpNum,说明两个IP值一样,即IP异常 case 0: //IP值异常 case -2: default: logger.error("compareNum is error," + "common_client_ip:" + commonClientIp + "," + "commonServerIp:" + commonServerIp + "," + "commonClientPort:" + commonClientPort + "," + "commonServerPort:" + commonServerPort); return ""; } } /** * 计算相关字节信息,主要是累加 * * @param firstSipOrRtpLog * @param secendSipOrRtpLog * @param sipOrRtpCombin */ private static void accumulateMsg(JSONObject firstSipOrRtpLog, JSONObject secendSipOrRtpLog, JSONObject sipOrRtpCombin) { //common_sessions sipOrRtpCombin.put(JsonProConfig.SESSIONS, (firstSipOrRtpLog.getLongValue(JsonProConfig.SESSIONS) + secendSipOrRtpLog.getLongValue(JsonProConfig.SESSIONS))); //common_c2s_pkt_num sipOrRtpCombin.put(JsonProConfig.C2S_PKT_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.C2S_PKT_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.C2S_PKT_NUM))); //common_s2c_pkt_num sipOrRtpCombin.put(JsonProConfig.S2C_PKT_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.S2C_PKT_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.S2C_PKT_NUM))); //common_c2s_byte_num sipOrRtpCombin.put(JsonProConfig.C2S_BYTE_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.C2S_BYTE_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.C2S_BYTE_NUM))); //common_s2c_byte_num sipOrRtpCombin.put(JsonProConfig.S2C_BYTE_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.S2C_BYTE_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.S2C_BYTE_NUM))); //common_c2s_ipfrag_num sipOrRtpCombin.put(JsonProConfig.C2S_IPFRAG_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.C2S_IPFRAG_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.C2S_IPFRAG_NUM))); //common_s2c_ipfrag_num sipOrRtpCombin.put(JsonProConfig.S2C_IPFRAG_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.S2C_IPFRAG_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.S2C_IPFRAG_NUM))); //common_c2s_tcp_lostlen sipOrRtpCombin.put(JsonProConfig.C2S_TCP_LOSTLEN, (firstSipOrRtpLog.getLongValue(JsonProConfig.C2S_TCP_LOSTLEN) + secendSipOrRtpLog.getLongValue(JsonProConfig.C2S_TCP_LOSTLEN))); //common_s2c_tcp_lostlen sipOrRtpCombin.put(JsonProConfig.S2C_TCP_LOSTLEN, (firstSipOrRtpLog.getLongValue(JsonProConfig.S2C_TCP_LOSTLEN) + secendSipOrRtpLog.getLongValue(JsonProConfig.S2C_TCP_LOSTLEN))); //common_c2s_tcp_unorder_num sipOrRtpCombin.put(JsonProConfig.C2S_TCP_UNORDER_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.C2S_TCP_UNORDER_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.C2S_TCP_UNORDER_NUM))); //common_s2c_tcp_unorder_num sipOrRtpCombin.put(JsonProConfig.S2C_TCP_UNORDER_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.S2C_TCP_UNORDER_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.S2C_TCP_UNORDER_NUM))); } /** * int类型 * 比较数字大小,左边>右边-返回:1,左边<右边-返回:-1,左边=右边-返回:0 * * @param numOne * @param numTwo */ private static int compareNum(int numOne, int numTwo) { if (numOne > 0 && numTwo > 0) { return Integer.compare(numOne, numTwo); } else { return -2; } } /** * long类型 * 比较数字大小,左边>右边-返回:1,左边<右边-返回:-1,左边=右边-返回:0 * * @param numOne * @param numTwo */ private static int compareNum(long numOne, long numTwo) { if (numOne > 0 && numTwo > 0) { return Long.compare(numOne, numTwo); } else { return -2; } } /** * 判断RTP单向流对准后是否存在多个文件,若相同则返回任意一个,若不同则拼接返回 * * @param firstSipOrRtpLog 第一个单向流日志 * @param secendSipOrRtpLog 第二个单向流日志 * @return 文件路径 */ private static String setRtpPath(Map firstSipOrRtpLog, Map secendSipOrRtpLog) { String firstRtpPcapPath = JsonParseUtil.getString(firstSipOrRtpLog, JsonProConfig.RTP_PCAP_PATH); String secendRtpPcapPath = JsonParseUtil.getString(secendSipOrRtpLog, JsonProConfig.RTP_PCAP_PATH); if (StringUtil.isNotBlank(firstRtpPcapPath) && StringUtil.isNotBlank(secendRtpPcapPath)) { if (firstRtpPcapPath.equals(secendRtpPcapPath)) { return firstRtpPcapPath; } else { return firstRtpPcapPath + ";" + secendRtpPcapPath; } } else if (StringUtil.isNotBlank(firstRtpPcapPath)) { return firstRtpPcapPath; } else { return secendRtpPcapPath; } } private static boolean checkSipCompleteness(JSONObject object) { return object.containsKey(JsonProConfig.SIP_ORIGINATOR_IP) && object.containsKey(JsonProConfig.SIP_ORIGINATOR_PORT) && object.containsKey(JsonProConfig.SIP_RESPONDER_IP) && object.containsKey(JsonProConfig.SIP_RESPONDER_PORT); } }