This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
galaxy-tsg-olap-log-stream-…/src/main/java/com/zdjizhi/utils/functions/OneSidedWindowFunction.java

368 lines
16 KiB
Java
Raw Normal View History

2021-09-27 11:14:34 +08:00
package com.zdjizhi.utils.functions;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson.JSONObject;
import com.zdjizhi.common.JsonProConfig;
import com.zdjizhi.common.VoipRelationConfig;
import com.zdjizhi.utils.StringUtil;
import com.zdjizhi.utils.ip.IPUtils;
import com.zdjizhi.utils.json.JsonParseUtil;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.HashMap;
import java.util.Map;
/**
* @author qidaijie
* @Package com.zdjizhi.utils.functions
* @Description:
* @date 2021/8/1818:04
*/
public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tuple3<String, String, String>, TimeWindow> {
private static final Log logger = LogFactory.get();
/**
* key-sip_call_id;value为sip的具体数据---存放的是SIP未关联的数据(单向流)
*/
private static HashMap<String, String> sipOriHmList = new HashMap<>(16);
/**
* key-rtp拼接的四元组;value为rtp的具体数据---存放的是RTP未关联的数据(单向流)
*/
private static HashMap<String, String> rtpOriHmList = new HashMap<>(16);
@Override
@SuppressWarnings("unchecked")
public void process(Context context, Iterable<String> inputs, Collector<Tuple3<String, String, String>> out) throws Exception {
for (String input : inputs) {
if (StringUtil.isNotBlank(input)) {
JSONObject object = JSONObject.parseObject(input);
String commonSchemaType = object.getString(JsonProConfig.SCHEMA_TYPE);
String sipCallId = object.getString(JsonProConfig.SIP_CALL_ID);
//1c2s2s2c3double
int commonStreamDir = object.getInteger(JsonProConfig.STREAM_DIR);
/*
* 针对SIP日志进行处理
*/
if (JsonProConfig.SIP_MARK.equals(commonSchemaType) && StringUtil.isNotBlank(sipCallId)) {
if (commonStreamDir != JsonProConfig.DOUBLE) {
putKeyAndMsg(input, sipCallId, sipOriHmList, "SIP", out);
} else {
separateInnerIp(object, out);
}
}
/*
* 针对RTP日志进行处理
*/
if (JsonProConfig.RTP_MARK.equals(commonSchemaType)) {
String rtpIpPort4Key = getFourKey(object.getString(JsonProConfig.CLIENT_IP),
object.getInteger(JsonProConfig.CLIENT_PORT),
object.getString(JsonProConfig.SERVER_IP),
object.getInteger(JsonProConfig.SERVER_PORT));
if (commonStreamDir != JsonProConfig.DOUBLE) {
//对rtp单向流进行关联
putKeyAndMsg(input, rtpIpPort4Key, rtpOriHmList, "RTP", out);
} else {
//RTP双向流,按四元组下发
out.collect(new Tuple3<>(rtpIpPort4Key, "rtp-two", input));
}
}
}
}
/*
* 定时发送SIP或RTP未关联上数据
*/
if (sipOriHmList.size() > 0) {
HashMap<String, String> tmpSipOriHmList = new HashMap<String, String>(sipOriHmList);
sipOriHmList.clear();
for (String sipKey : tmpSipOriHmList.keySet()) {
String sipSingleMsg = tmpSipOriHmList.get(sipKey);
//sipKey为sip_call_id,未关联成功的sip是不能使用的
out.collect(new Tuple3<>(sipKey, "sip-single", sipSingleMsg));
}
}
if (rtpOriHmList.size() > 0) {
HashMap<String, String> tmpRtpOriHmList = new HashMap<String, String>(rtpOriHmList);
rtpOriHmList.clear();
for (String rtpKey : tmpRtpOriHmList.keySet()) {
String rtpSingleMsg = tmpRtpOriHmList.get(rtpKey);
//未关联成功的rtp还可以继续关联,因为有四元组
out.collect(new Tuple3<>(rtpKey, "rtp-single", rtpSingleMsg));
}
}
}
/**
* 存放key并关联拼接对应Key
*/
private static void putKeyAndMsg(String message, String hmStrKey, HashMap<String, String> hashMapStr, String protocolType, Collector<Tuple3<String, String, String>> out) {
//和上次存入的数据关联
if (hashMapStr.containsKey(hmStrKey)) {
JSONObject jsonCombinObject = new JSONObject();
String[] strArr = new String[2];
String firstMsg = hashMapStr.remove(hmStrKey);
JSONObject firstSipOrRtpLog = JSONObject.parseObject(firstMsg);
JSONObject secendSipOrRtpLog = JSONObject.parseObject(message);
//1c2s2s2c3double,1表示firstMsg为请求侧(c2s),合并时以它为准
if (firstSipOrRtpLog.getInteger(JsonProConfig.STREAM_DIR) == 1) {
strArr[0] = message;
strArr[1] = firstMsg;
} else {
strArr[0] = firstMsg;
strArr[1] = message;
}
jsonCombinObject.putAll(JSONObject.parseObject(strArr[0]));
jsonCombinObject.putAll(JSONObject.parseObject(strArr[1]));
String sipTwoMsg = jsonCombinObject.toString();
JSONObject sipOrRtpCombin = JSONObject.parseObject(sipTwoMsg);
accumulateMsg(firstSipOrRtpLog, secendSipOrRtpLog, sipOrRtpCombin);
sipOrRtpCombin.put(JsonProConfig.STREAM_DIR, JsonProConfig.DOUBLE);
if (JsonProConfig.SIP_MARK.equals(protocolType)) {
//手动关联SIP后区分内外网IP再下发
separateInnerIp(sipOrRtpCombin, out);
} else if (JsonProConfig.RTP_MARK.equals(protocolType)) {
//手动关联RTP后按四元组下发
sipOrRtpCombin.put(JsonProConfig.RTP_PCAP_PATH, setRtpPath(firstSipOrRtpLog, secendSipOrRtpLog));
out.collect(new Tuple3<>(hmStrKey, "rtp-two", JSONObject.toJSONString(sipOrRtpCombin)));
}
} else {
hashMapStr.put(hmStrKey, message);
}
}
/**
* 区分SIP的内外网IP,此时已经关联完成包含四元组,但未区分内外网IP
*/
private static void separateInnerIp(JSONObject object, Collector<Tuple3<String, String, String>> out) {
String sipOriginatorIp = object.getString(JsonProConfig.SIP_ORIGINATOR_IP);
String sipResponderIp = object.getString(JsonProConfig.SIP_RESPONDER_IP);
int sipOriginatorPort = object.getInteger(JsonProConfig.SIP_ORIGINATOR_PORT);
int sipResponderPort = object.getInteger(JsonProConfig.SIP_RESPONDER_PORT);
if (IPUtils.isInnerIp(sipOriginatorIp)
|| IPUtils.isInnerIp(sipResponderIp)) {
/**
* 按from-ip_from-port_to-ip_to-port
*/
String sipInnerEmitKey = sipOriginatorIp + VoipRelationConfig.CORRELATION_STR
+ sipOriginatorPort + VoipRelationConfig.CORRELATION_STR
+ sipResponderIp + VoipRelationConfig.CORRELATION_STR
+ sipResponderPort;
//包含内网IP的SIP关联后数据
out.collect(new Tuple3<>(sipInnerEmitKey, "sip-in", JSONObject.toJSONString(object)));
} else {
String sipIpPort4Key = getFourKey(sipOriginatorIp,
sipOriginatorPort,
sipResponderIp,
sipResponderPort);
//按照四元组的Key发送到下一个bolt
out.collect(new Tuple3<>(sipIpPort4Key, "sip-two", JSONObject.toJSONString(object)));
}
}
/**
* 获得四元组key
*
* @param commonClientIp 客户端IP
* @param commonClientPort 客户端端口
* @param commonServerIp 服务端IP
* @param commonServerPort 服务端端口
* @return 比较拼接后的四元组
*/
private static String getFourKey(String commonClientIp, int commonClientPort, String commonServerIp, int commonServerPort) {
String ipPort4Key = "";
int comparePortResult = compareNum(commonClientPort, commonServerPort);
/*
* 按端口比较
*/
switch (comparePortResult) {
//common_client_port > commonServerPort
case 1:
ipPort4Key = commonServerIp + VoipRelationConfig.CORRELATION_STR
+ commonServerPort + VoipRelationConfig.CORRELATION_STR
+ commonClientIp + VoipRelationConfig.CORRELATION_STR
+ commonClientPort;
break;
//common_client_port < commonServerPort
case -1:
ipPort4Key = commonClientIp + VoipRelationConfig.CORRELATION_STR
+ commonClientPort + VoipRelationConfig.CORRELATION_STR
+ commonServerIp + VoipRelationConfig.CORRELATION_STR
+ commonServerPort;
break;
//common_client_port = commonServerPort,开始按照IP比较
case 0:
ipPort4Key = compareIp(commonClientIp, commonServerIp, commonClientPort, commonServerPort);
break;
//port端口值异常
case -2:
default:
logger.error("compareNum is error," +
"common_client_port:" + commonClientPort + "," +
"commonServerPort:" + commonServerPort);
break;
}
return ipPort4Key;
}
/**
* 比较IP,并作key的拼接
*
* @param commonClientIp
* @param commonServerIp
* @param commonClientPort
* @param commonServerPort
* @return
*/
private static String compareIp(String commonClientIp, String commonServerIp, int commonClientPort, int commonServerPort) {
long clientIpNum = IPUtils.ipToLong(commonClientIp);
long serverIpNum = IPUtils.ipToLong(commonServerIp);
int compareIpResult = compareNum(clientIpNum, serverIpNum);
switch (compareIpResult) {
//clientIpNum > serverIpNum
case 1:
return commonServerIp + VoipRelationConfig.CORRELATION_STR
+ commonServerPort + VoipRelationConfig.CORRELATION_STR
+ commonClientIp + VoipRelationConfig.CORRELATION_STR
+ commonClientPort;
//clientIpNum < serverIpNum
case -1:
return commonClientIp + VoipRelationConfig.CORRELATION_STR
+ commonClientPort + VoipRelationConfig.CORRELATION_STR
+ commonServerIp + VoipRelationConfig.CORRELATION_STR
+ commonServerPort;
//clientIpNum = serverIpNum,说明两个IP值一样即IP异常
case 0:
//IP值异常
case -2:
default:
logger.error("compareNum is error," +
"common_client_ip:" + commonClientIp + "," +
"commonServerIp:" + commonServerIp + "," +
"commonClientPort:" + commonClientPort + "," +
"commonServerPort:" + commonServerPort);
return "";
}
}
/**
* 计算相关字节信息,主要是累加
*
* @param firstSipOrRtpLog
* @param secendSipOrRtpLog
* @param sipOrRtpCombin
*/
private static void accumulateMsg(JSONObject firstSipOrRtpLog, JSONObject secendSipOrRtpLog, JSONObject sipOrRtpCombin) {
//common_sessions
sipOrRtpCombin.put(JsonProConfig.SESSIONS, (firstSipOrRtpLog.getLongValue(JsonProConfig.SESSIONS) + secendSipOrRtpLog.getLongValue(JsonProConfig.SESSIONS)));
//common_c2s_pkt_num
sipOrRtpCombin.put(JsonProConfig.C2S_PKT_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.C2S_PKT_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.C2S_PKT_NUM)));
//common_s2c_pkt_num
sipOrRtpCombin.put(JsonProConfig.S2C_PKT_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.S2C_PKT_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.S2C_PKT_NUM)));
//common_c2s_byte_num
sipOrRtpCombin.put(JsonProConfig.C2S_BYTE_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.C2S_BYTE_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.C2S_BYTE_NUM)));
//common_s2c_byte_num
sipOrRtpCombin.put(JsonProConfig.S2C_BYTE_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.S2C_BYTE_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.S2C_BYTE_NUM)));
//common_c2s_ipfrag_num
sipOrRtpCombin.put(JsonProConfig.C2S_IPFRAG_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.C2S_IPFRAG_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.C2S_IPFRAG_NUM)));
//common_s2c_ipfrag_num
sipOrRtpCombin.put(JsonProConfig.S2C_IPFRAG_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.S2C_IPFRAG_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.S2C_IPFRAG_NUM)));
//common_c2s_tcp_lostlen
sipOrRtpCombin.put(JsonProConfig.C2S_TCP_LOSTLEN, (firstSipOrRtpLog.getLongValue(JsonProConfig.C2S_TCP_LOSTLEN) + secendSipOrRtpLog.getLongValue(JsonProConfig.C2S_TCP_LOSTLEN)));
//common_s2c_tcp_lostlen
sipOrRtpCombin.put(JsonProConfig.S2C_TCP_LOSTLEN, (firstSipOrRtpLog.getLongValue(JsonProConfig.S2C_TCP_LOSTLEN) + secendSipOrRtpLog.getLongValue(JsonProConfig.S2C_TCP_LOSTLEN)));
//common_c2s_tcp_unorder_num
sipOrRtpCombin.put(JsonProConfig.C2S_TCP_UNORDER_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.C2S_TCP_UNORDER_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.C2S_TCP_UNORDER_NUM)));
//common_s2c_tcp_unorder_num
sipOrRtpCombin.put(JsonProConfig.S2C_TCP_UNORDER_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.S2C_TCP_UNORDER_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.S2C_TCP_UNORDER_NUM)));
}
/**
* int类型
* 比较数字大小,左边>右边-返回:1,左边<右边-返回:-1,左边=右边-返回:0
*
* @param numOne
* @param numTwo
*/
private static int compareNum(int numOne, int numTwo) {
if (numOne > 0 && numTwo > 0) {
return Integer.compare(numOne, numTwo);
} else {
return -2;
}
}
/**
* long类型
* 比较数字大小,左边>右边-返回:1,左边<右边-返回:-1,左边=右边-返回:0
*
* @param numOne
* @param numTwo
*/
private static int compareNum(long numOne, long numTwo) {
if (numOne > 0 && numTwo > 0) {
return Long.compare(numOne, numTwo);
} else {
return -2;
}
}
/**
* 判断RTP单向流对准后是否存在多个文件若相同则返回任意一个若不同则拼接返回
*
* @param firstSipOrRtpLog 第一个单向流日志
* @param secendSipOrRtpLog 第二个单向流日志
* @return 文件路径
*/
private static String setRtpPath(Map<String, Object> firstSipOrRtpLog, Map<String, Object> secendSipOrRtpLog) {
String firstRtpPcapPath = JsonParseUtil.getString(firstSipOrRtpLog, JsonProConfig.RTP_PCAP_PATH);
String secendRtpPcapPath = JsonParseUtil.getString(secendSipOrRtpLog, JsonProConfig.RTP_PCAP_PATH);
if (StringUtil.isNotBlank(firstRtpPcapPath) && StringUtil.isNotBlank(secendRtpPcapPath)) {
if (firstRtpPcapPath.equals(secendRtpPcapPath)) {
return firstRtpPcapPath;
} else {
return firstRtpPcapPath + ";" + secendRtpPcapPath;
}
} else if (StringUtil.isNotBlank(firstRtpPcapPath)) {
return firstRtpPcapPath;
} else {
return secendRtpPcapPath;
}
}
}