1:重构VOIP任务结构,使用多window的方式进行融合。

2:增加VSYS融合维度(TSG-11721)。
This commit is contained in:
qidaijie
2022-09-14 16:58:06 +08:00
parent 3df5d8c51e
commit 9d2f2cfd89
30 changed files with 587 additions and 649 deletions

21
pom.xml
View File

@@ -6,7 +6,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>log-stream-voip-relation</artifactId>
<version>220418-Nacos</version>
<version>220914-VSYS</version>
<name>log-stream-voip-relation</name>
<url>http://www.example.com</url>
@@ -23,10 +23,10 @@
<id>maven-ali</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<releases>
<!--<enabled>true</enabled>-->
<enabled>true</enabled>
</releases>
<snapshots>
<!--<enabled>true</enabled>-->
<enabled>true</enabled>
<checksumPolicy>fail</checksumPolicy>
</snapshots>
</repository>
@@ -207,23 +207,10 @@
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.5.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
<version>5.7.17</version>
</dependency>
<dependency>

View File

@@ -48,8 +48,6 @@ nacos.pin=nacos
#nacos group
nacos.group=Galaxy
#====================Topology Default====================#
#check ip is Inner network;0 off, 1 on.
check.inner.network=0

View File

@@ -15,7 +15,7 @@ tools.library=D:\\workerspace\\dat\\
nacos.server=192.168.44.12:8848
#nacos namespace
nacos.schema.namespace=flink
nacos.schema.namespace=test
#nacos data id
nacos.data.id=voip_record.json
@@ -32,13 +32,19 @@ group.id=mytest-1
#--------------------------------topology配置------------------------------#
#map函数并行度
window.parallelism=1
merge.uniflow.window.parallelism=1
#voip日志对准窗口时间 seconds
voip.calibration.window.time=60
#map函数并行度
calibration.window.parallelism=1
#单向流对准窗口时间 seconds
one.sided.window.time=10
#voip日志对准窗口时间 seconds
voip.calibration.window.time=30
#voip二次对准时间 seconds
sec.combine.sr.cache.secs=120
sec.combine.sr.cache.secs=60
#check ip is Inner network;0 off, 1 on.
check.inner.network=0

View File

@@ -1,8 +1,6 @@
package com.zdjizhi.common;
import com.zdjizhi.utils.system.VoipRelationConfigurations;
/**
* @author Administrator
*/
@@ -13,123 +11,103 @@ public class JsonProConfig {
public static final int DOUBLE = 3;
/**
*
* SIP日志标识
*/
public static final String SIP_MARK = "SIP";
/**
*
* RTP日志标识
*/
public static final String RTP_MARK = "RTP";
/**
*
* 所属vsys缺省为1
*/
public static final String VSYS_ID = "common_vsys_id";
/**
* 日志类型
*/
public static final String SCHEMA_TYPE = "common_schema_type";
/**
*
* 会话结束时间
*/
public static final String END_TIME = "common_end_time";
/**
*
* 流类型
* 1c2s2s2c3double
*/
public static final String STREAM_DIR = "common_stream_dir";
/**
*
*/
public static final String SESSIONS = "common_sessions";
/**
*
*/
public static final String C2S_PKT_NUM = "common_c2s_pkt_num";
/**
*
*/
public static final String S2C_PKT_NUM = "common_s2c_pkt_num";
/**
*
*/
public static final String C2S_BYTE_NUM = "common_c2s_byte_num";
/**
*
*/
public static final String S2C_BYTE_NUM = "common_s2c_byte_num";
/**
*
*/
public static final String C2S_IPFRAG_NUM = "common_c2s_ipfrag_num";
/**
*
*/
public static final String S2C_IPFRAG_NUM = "common_s2c_ipfrag_num";
/**
*
*/
public static final String C2S_TCP_LOSTLEN = "common_c2s_tcp_lostlen";
/**
*
*/
public static final String S2C_TCP_LOSTLEN = "common_s2c_tcp_lostlen";
/**
*
*/
public static final String C2S_TCP_UNORDER_NUM = "common_c2s_tcp_unorder_num";
/**
*
*/
public static final String S2C_TCP_UNORDER_NUM = "common_s2c_tcp_unorder_num";
/**
*
* 客户端ip地址
*/
public static final String CLIENT_IP = "common_client_ip";
/**
*
* 客户端端口
*/
public static final String CLIENT_PORT = "common_client_port";
/**
*
* 服务端ip地址
*/
public static final String SERVER_IP = "common_server_ip";
/**
*
* 服务端端口
*/
public static final String SERVER_PORT = "common_server_port";
/**
*
* 会话ID
*/
public static final String SIP_CALL_ID = "sip_call_id";
/**
*
* 协商的主叫语音传输IP
*/
public static final String SIP_ORIGINATOR_IP = "sip_originator_sdp_connect_ip";
/**
*
* 协商的主叫语音传输端口
*/
public static final String SIP_ORIGINATOR_PORT = "sip_originator_sdp_media_port";
/**
*
* 协商的被叫语音传输IP
*/
public static final String SIP_RESPONDER_IP = "sip_responder_sdp_connect_ip";
/**
*
* 协商的被叫语音传输端口
*/
public static final String SIP_RESPONDER_PORT = "sip_responder_sdp_media_port";
/**
*
* RTP原始包文件地址
*/
public static final String RTP_PCAP_PATH = "rtp_pcap_path";
/**
*
* 主叫方向
*/
public static final String RTP_ORIGINATOR_DIR = "rtp_originator_dir";
/**
*
* c2s编码方式序号PT
*/
public static final String RTP_PAYLOAD_TYPE_C2S = "rtp_payload_type_c2s";
/**
*
* s2c编码方式序号PT
*/
public static final String RTP_PAYLOAD_TYPE_S2C = "rtp_payload_type_s2c";
/**
* 各类Transmission指标key
*/
public static final String SESSIONS = "common_sessions";
public static final String C2S_PKT_NUM = "common_c2s_pkt_num";
public static final String S2C_PKT_NUM = "common_s2c_pkt_num";
public static final String C2S_BYTE_NUM = "common_c2s_byte_num";
public static final String S2C_BYTE_NUM = "common_s2c_byte_num";
public static final String C2S_IPFRAG_NUM = "common_c2s_ipfrag_num";
public static final String S2C_IPFRAG_NUM = "common_s2c_ipfrag_num";
public static final String C2S_TCP_LOSTLEN = "common_c2s_tcp_lostlen";
public static final String S2C_TCP_LOSTLEN = "common_s2c_tcp_lostlen";
public static final String C2S_TCP_UNORDER_NUM = "common_c2s_tcp_unorder_num";
public static final String S2C_TCP_UNORDER_NUM = "common_s2c_tcp_unorder_num";
}

View File

@@ -1,7 +1,7 @@
package com.zdjizhi.common;
import com.zdjizhi.utils.system.VoipRelationConfigurations;
import com.zdjizhi.tools.system.VoipRelationConfigurations;
import org.jasypt.encryption.pbe.StandardPBEStringEncryptor;
/**
@@ -39,7 +39,8 @@ public class VoipRelationConfig {
*/
public static final Integer VOIP_CALIBRATION_WINDOW_TIME = VoipRelationConfigurations.getIntProperty(0, "voip.calibration.window.time");
public static final Integer ONE_SIDED_WINDOW_TIME = VoipRelationConfigurations.getIntProperty(0, "one.sided.window.time");
public static final Integer WINDOW_PARALLELISM = VoipRelationConfigurations.getIntProperty(0, "window.parallelism");
public static final Integer MERGE_UNIFLOW_WINDOW_PARALLELISM = VoipRelationConfigurations.getIntProperty(0, "merge.uniflow.window.parallelism");
public static final Integer CALIBRATION_WINDOW_PARALLELISM = VoipRelationConfigurations.getIntProperty(0, "calibration.window.parallelism");
/**
* connection kafka
@@ -72,15 +73,11 @@ public class VoipRelationConfig {
public static final Integer MAX_REQUEST_SIZE = VoipRelationConfigurations.getIntProperty(1, "max.request.size");
public static final String TOOLS_LIBRARY = VoipRelationConfigurations.getStringProperty(0, "tools.library");
/**
* http
*/
public static final String SCHEMA_HTTP = VoipRelationConfigurations.getStringProperty(0, "schema.http");
/**
* voip
*/
public static final Integer SEC_COMBINE_SR_CACHE_SECS = VoipRelationConfigurations.getIntProperty(0, "sec.combine.sr.cache.secs");
public static final Integer CHECK_INNER_NETWORK = VoipRelationConfigurations.getIntProperty(1, "check.inner.network");
public static final Integer CHECK_INNER_NETWORK = VoipRelationConfigurations.getIntProperty(0, "check.inner.network");
}

View File

@@ -0,0 +1,17 @@
package com.zdjizhi.operator.filter;
import com.zdjizhi.utils.StringUtil;
import org.apache.flink.api.common.functions.FilterFunction;
/**
* @author qidaijie
* @Package com.zdjizhi.operator.filter
* @Description:
* @date 2022/9/214:59
*/
public class NullFilterFunction implements FilterFunction<String> {
@Override
public boolean filter(String message) throws Exception {
return StringUtil.isNotBlank(message);
}
}

View File

@@ -0,0 +1,22 @@
package com.zdjizhi.operator.group;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import java.util.Map;
/**
* @author qidaijie
* @Package com.zdjizhi.operator.group
* @Description:
* @date 2022/8/2911:08
*/
public class MergeUniFlowKeyByFunction implements KeySelector<Tuple3<String, Map<String, Object>, Integer>, Integer> {
@Override
public Integer getKey(Tuple3<String, Map<String, Object>, Integer> value) throws Exception {
//vsys id
return value.f2;
}
}

View File

@@ -0,0 +1,27 @@
package com.zdjizhi.operator.group;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import java.util.Map;
/**
* @author qidaijie
* @Package com.zdjizhi.operator.group
* @Description:
* @date 2022/8/2911:08
*/
public class VoipCalibrationKeyByFunction implements KeySelector<Tuple4<String, String, Map<String, Object>, Integer>, Tuple2<String, Integer>> {
@Override
public Tuple2<String, Integer> getKey(Tuple4<String, String, Map<String, Object>, Integer> value) throws Exception {
String fourKey = value.f0;
Integer vsysId = value.f3;
return new Tuple2<>(fourKey, vsysId);
}
}

View File

@@ -0,0 +1,82 @@
package com.zdjizhi.operator.parse;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.JsonProConfig;
import com.zdjizhi.tools.utils.RelationUtils;
import com.zdjizhi.tools.json.JsonParseUtil;
import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.StringUtil;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import java.util.Map;
/**
* @author qidaijie
* @Package com.zdjizhi.operator.parse
* @Description:
* @date 2022/9/916:21
*/
public class ParseMapFunction implements MapFunction<String, Tuple3<String, Map<String, Object>, Integer>> {
private static final Log logger = LogFactory.get();
@Override
public Tuple3<String, Map<String, Object>, Integer> map(String input) throws Exception {
try {
if (StringUtil.isNotBlank(input)) {
Map<String, Object> jsonMap = JsonParseUtil.typeTransform((Map<String, Object>) JsonMapper.fromJsonString(input, Map.class));
String commonSchemaType = JsonParseUtil.getString(jsonMap, JsonProConfig.SCHEMA_TYPE);
Integer vsysId = getVsysId(jsonMap);
String sipCallId = JsonParseUtil.getString(jsonMap, JsonProConfig.SIP_CALL_ID);
//1c2s2s2c3double
int commonStreamDir = JsonParseUtil.getInteger(jsonMap, JsonProConfig.STREAM_DIR);
if (JsonProConfig.SIP_MARK.equals(commonSchemaType)) {
if (StringUtil.isNotBlank(sipCallId)) {
if (RelationUtils.checkSipCompleteness(jsonMap)) {
if (commonStreamDir != JsonProConfig.DOUBLE) {
return new Tuple3<>("sip-single", jsonMap, vsysId);
} else {
return new Tuple3<>("sip-double", jsonMap, vsysId);
}
} else {
return new Tuple3<>("violation", jsonMap, vsysId);
}
} else {
return new Tuple3<>("violation", jsonMap, vsysId);
}
} else if (JsonProConfig.RTP_MARK.equals(commonSchemaType)) {
if (commonStreamDir == JsonProConfig.DOUBLE) {
return new Tuple3<>("rtp-double", jsonMap, vsysId);
} else {
return new Tuple3<>("rtp-single", jsonMap, vsysId);
}
}
}
} catch (RuntimeException e) {
logger.error("TransForm logs failed,The exception is :" + e.getMessage());
}
return null;
}
/**
* 获取VSYS ID若无该字段则默认值为1
*
* @param jsonMap 原始日志
* @return vsysid
*/
private static int getVsysId(Map<String, Object> jsonMap) {
Object value = jsonMap.getOrDefault(JsonProConfig.VSYS_ID, null);
if (value != null) {
return Integer.parseInt(value.toString());
} else {
return 1;
}
}
}

View File

@@ -1,4 +1,4 @@
package com.zdjizhi.utils.functions;
package com.zdjizhi.operator.window;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
@@ -7,167 +7,162 @@ import com.zdjizhi.common.VoipRelationConfig;
import com.zdjizhi.utils.IPUtil;
import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.StringUtil;
import com.zdjizhi.utils.json.JsonParseUtil;
import com.zdjizhi.tools.utils.RelationUtils;
import com.zdjizhi.tools.json.JsonParseUtil;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import scala.Int;
import java.util.HashMap;
import java.util.Map;
/**
* @author qidaijie
* @Package com.zdjizhi.utils.functions
* @Package com.zdjizhi.operator
* @Description:
* @date 2021/8/1818:04
* @date 2022/8/2911:44
*/
public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tuple3<String, String, String>, TimeWindow> {
public class MergeUniFlowWindowFunction extends ProcessWindowFunction<Tuple3<String, Map<String, Object>, Integer>, Tuple4<String, String, Map<String, Object>, Integer>, Integer, TimeWindow> {
private static final Log logger = LogFactory.get();
/**
* key-sip_call_id;value为sip的具体数据---存放的是SIP未关联的数据(单向流)
*/
private static HashMap<String, String> sipOriHmList = new HashMap<>(32);
private static HashMap<String, Map<String, Object>> sipOriHmList = new HashMap<>(32);
/**
* key-rtp拼接的四元组;value为rtp的具体数据---存放的是RTP未关联的数据(单向流)
*/
private static HashMap<String, String> rtpOriHmList = new HashMap<>(32);
private static HashMap<String, Map<String, Object>> rtpOriHmList = new HashMap<>(32);
@Override
@SuppressWarnings("unchecked")
public void process(Context context, Iterable<String> inputs, Collector<Tuple3<String, String, String>> out) {
for (String input : inputs) {
if (StringUtil.isNotBlank(input)) {
try {
Map<String, Object> object = JsonParseUtil.typeTransform((Map<String, Object>) JsonMapper.fromJsonString(input, Map.class));
public void process(Integer key, Context context, Iterable<Tuple3<String, Map<String, Object>, Integer>> inputs, Collector<Tuple4<String, String, Map<String, Object>, Integer>> out) throws Exception {
for (Tuple3<String, Map<String, Object>, Integer> input : inputs) {
if (input != null) {
//已关联的sip,rtp;未关联的sip,rtp;内网的sip
String type = input.f0;
Map<String, Object> jsonMap = input.f1;
Integer vsysId = input.f2;
String commonSchemaType = JsonParseUtil.getString(object, JsonProConfig.SCHEMA_TYPE);
String sipCallId = JsonParseUtil.getString(object, JsonProConfig.SIP_CALL_ID);
logger.error("" + type + "----" + jsonMap.toString());
switch (type) {
case "sip-double":
separateInnerIp(jsonMap, out, vsysId);
break;
case "rtp-double":
String rtpDouble4Key = getFourKey(JsonParseUtil.getString(jsonMap, JsonProConfig.CLIENT_IP),
JsonParseUtil.getInteger(jsonMap, JsonProConfig.CLIENT_PORT),
JsonParseUtil.getString(jsonMap, JsonProConfig.SERVER_IP),
JsonParseUtil.getInteger(jsonMap, JsonProConfig.SERVER_PORT));
//1c2s2s2c3double
int commonStreamDir = JsonParseUtil.getInteger(object, JsonProConfig.STREAM_DIR);
/*
* 针对SIP日志进行处理
*/
if (JsonProConfig.SIP_MARK.equals(commonSchemaType) && StringUtil.isNotBlank(sipCallId)) {
if (relationUtils.checkSipCompleteness(object)) {
if (commonStreamDir != JsonProConfig.DOUBLE) {
putKeyAndMsg(input, sipCallId, sipOriHmList, "SIP", out);
} else {
separateInnerIp(object, out);
}
out.collect(new Tuple4<>(rtpDouble4Key, type, jsonMap, vsysId));
break;
case "violation":
out.collect(new Tuple4<>("", type, jsonMap, vsysId));
break;
//单向流的SIP
case "sip-single":
String sipCallId = JsonParseUtil.getString(jsonMap, JsonProConfig.SIP_CALL_ID);
String sipSingle4Key = getFourKey(JsonParseUtil.getString(jsonMap, JsonProConfig.CLIENT_IP),
JsonParseUtil.getInteger(jsonMap, JsonProConfig.CLIENT_PORT),
JsonParseUtil.getString(jsonMap, JsonProConfig.SERVER_IP),
JsonParseUtil.getInteger(jsonMap, JsonProConfig.SERVER_PORT));
if (StringUtil.isNotBlank(sipCallId)) {
putKeyAndMsg(jsonMap, sipCallId + sipSingle4Key, sipOriHmList, "SIP", vsysId, out);
} else {
out.collect(new Tuple3<>("", "violation", input));
out.collect(new Tuple4<>("", type, jsonMap, vsysId));
}
}
break;
//单向流的RTP
case "rtp-single":
String rtpSingle4Key = getFourKey(JsonParseUtil.getString(jsonMap, JsonProConfig.CLIENT_IP),
JsonParseUtil.getInteger(jsonMap, JsonProConfig.CLIENT_PORT),
JsonParseUtil.getString(jsonMap, JsonProConfig.SERVER_IP),
JsonParseUtil.getInteger(jsonMap, JsonProConfig.SERVER_PORT));
//对rtp单向流进行关联
putKeyAndMsg(jsonMap, rtpSingle4Key, rtpOriHmList, "RTP", vsysId, out);
break;
//内网的SIP
default:
logger.error("type is beyond expectation:" + type);
break;
/*
* 针对RTP日志进行处理
*/
if (JsonProConfig.RTP_MARK.equals(commonSchemaType)) {
String clientIP = JsonParseUtil.getString(object, JsonProConfig.CLIENT_IP);
int clientPort = JsonParseUtil.getInteger(object, JsonProConfig.CLIENT_PORT);
String ServerIP = JsonParseUtil.getString(object, JsonProConfig.SERVER_IP);
int ServerPort = JsonParseUtil.getInteger(object, JsonProConfig.SERVER_PORT);
String rtpIpPort4Key = getFourKey(clientIP, clientPort, ServerIP, ServerPort);
if (commonStreamDir != JsonProConfig.DOUBLE) {
//对rtp单向流进行关联
putKeyAndMsg(input, rtpIpPort4Key, rtpOriHmList, "RTP", out);
} else {
//RTP双向流,按四元组下发
out.collect(new Tuple3<>(rtpIpPort4Key, "rtp-two", input));
}
}
} catch (RuntimeException e) {
logger.error("parsing JSON or Unidirectional data flow fusion has exception! error is :" + e);
}
}
}
/*
* 定时发送SIP未关联上数据
*/
if (sipOriHmList.size() > 0) {
HashMap<String, String> tmpSipOriHmList = new HashMap<>(sipOriHmList);
HashMap<String, Map<String, Object>> tmpSipOriHmList = new HashMap<>(sipOriHmList);
sipOriHmList.clear();
for (String sipKey : tmpSipOriHmList.keySet()) {
String sipSingleMsg = tmpSipOriHmList.get(sipKey);
Map<String, Object> sipSingleMsg = tmpSipOriHmList.get(sipKey);
//sipKey为sip_call_id,未关联成功的sip是不能使用的
out.collect(new Tuple3<>(sipKey, "sip-single", sipSingleMsg));
out.collect(new Tuple4<>(sipKey, "sip-single", sipSingleMsg, 0));
}
}
/*
* 定时发送RTP未关联上数据
*/
if (rtpOriHmList.size() > 0) {
HashMap<String, String> tmpRtpOriHmList = new HashMap<>(rtpOriHmList);
HashMap<String, Map<String, Object>> tmpRtpOriHmList = new HashMap<>(rtpOriHmList);
rtpOriHmList.clear();
for (String rtpKey : tmpRtpOriHmList.keySet()) {
String rtpSingleMsg = tmpRtpOriHmList.get(rtpKey);
Map<String, Object> rtpSingleMsg = tmpRtpOriHmList.get(rtpKey);
//未关联成功的rtp还可以继续关联,因为有四元组
out.collect(new Tuple3<>(rtpKey, "rtp-single", rtpSingleMsg));
out.collect(new Tuple4<>(rtpKey, "rtp-single", rtpSingleMsg, 0));
}
}
}
/**
* 存放key并关联拼接对应Key
*/
@SuppressWarnings("unchecked")
private static void putKeyAndMsg(String message, String hmStrKey, HashMap<String, String> hashMapStr, String protocolType, Collector<Tuple3<String, String, String>> out) {
private static void putKeyAndMsg(Map<String, Object> secondSipOrRtpLog, String hmStrKey, HashMap<String, Map<String, Object>> hashMapStr, String protocolType, Integer vsysId, Collector<Tuple4<String, String, Map<String, Object>, Integer>> out) {
//和上次存入的数据关联
if (hashMapStr.containsKey(hmStrKey)) {
HashMap<String, Object> jsonCommonMap = new HashMap<>(32);
String[] strArr = new String[2];
String firstMsg = hashMapStr.remove(hmStrKey);
Map<String, Object> firstSipOrRtpLog = (Map<String, Object>) JsonMapper.fromJsonString(firstMsg, Map.class);
Map<String, Object> secondSipOrRtpLog = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class);
HashMap<String, Object> jsonCommonMap = new HashMap<>(128);
Map<String, Object> firstSipOrRtpLog = hashMapStr.remove(hmStrKey);
//1c2s2s2c3double,1表示firstMsg为请求侧(c2s),合并时以它为准
if (JsonParseUtil.getInteger(firstSipOrRtpLog, JsonProConfig.STREAM_DIR) == 1) {
strArr[0] = message;
strArr[1] = firstMsg;
jsonCommonMap.putAll(secondSipOrRtpLog);
jsonCommonMap.putAll(firstSipOrRtpLog);
} else {
strArr[0] = firstMsg;
strArr[1] = message;
jsonCommonMap.putAll(firstSipOrRtpLog);
jsonCommonMap.putAll(secondSipOrRtpLog);
}
jsonCommonMap.putAll((Map<String, Object>) JsonMapper.fromJsonString(strArr[0], Map.class));
jsonCommonMap.putAll((Map<String, Object>) JsonMapper.fromJsonString(strArr[1], Map.class));
String sipTwoMsg = jsonCommonMap.toString();
accumulateMsg(firstSipOrRtpLog, secondSipOrRtpLog, jsonCommonMap);
jsonCommonMap.put(JsonProConfig.STREAM_DIR, JsonProConfig.DOUBLE);
Map<String, Object> sipOrRtpCombin = (Map<String, Object>) JsonMapper.fromJsonString(sipTwoMsg, Map.class);
accumulateMsg(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin);
sipOrRtpCombin.put(JsonProConfig.STREAM_DIR, JsonProConfig.DOUBLE);
if (JsonProConfig.SIP_MARK.equals(protocolType)) {
//手动关联SIP后区分内外网IP再下发
separateInnerIp(sipOrRtpCombin, out);
separateInnerIp(jsonCommonMap, out, vsysId);
} else if (JsonProConfig.RTP_MARK.equals(protocolType)) {
//手动关联RTP后按四元组下发
sipOrRtpCombin.put(JsonProConfig.RTP_PCAP_PATH, setRtpPacpPath(firstSipOrRtpLog, secondSipOrRtpLog));
out.collect(new Tuple3<>(hmStrKey, "rtp-two", JsonMapper.toJsonString(sipOrRtpCombin)));
jsonCommonMap.put(JsonProConfig.RTP_PCAP_PATH, setRtpPacpPath(firstSipOrRtpLog, secondSipOrRtpLog));
out.collect(new Tuple4<>(hmStrKey, "rtp-double", jsonCommonMap, vsysId));
}
} else {
hashMapStr.put(hmStrKey, message);
hashMapStr.put(hmStrKey, secondSipOrRtpLog);
}
}
/**
* 区分SIP的内外网IP,此时已经关联完成包含四元组,但未区分内外网IP
*/
private static void separateInnerIp(Map<String, Object> object, Collector<Tuple3<String, String, String>> out) {
private static void separateInnerIp(Map<String, Object> jsonMap, Collector<Tuple4<String, String, Map<String, Object>, Integer>> out, Integer vsysid) {
String sipOriginatorIp = JsonParseUtil.getString(object, JsonProConfig.SIP_ORIGINATOR_IP);
int sipOriginatorPort = JsonParseUtil.getInteger(object, JsonProConfig.SIP_ORIGINATOR_PORT);
String sipResponderIp = JsonParseUtil.getString(object, JsonProConfig.SIP_RESPONDER_IP);
int sipResponderPort = JsonParseUtil.getInteger(object, JsonProConfig.SIP_RESPONDER_PORT);
String sipOriginatorIp = JsonParseUtil.getString(jsonMap, JsonProConfig.SIP_ORIGINATOR_IP);
int sipOriginatorPort = JsonParseUtil.getInteger(jsonMap, JsonProConfig.SIP_ORIGINATOR_PORT);
String sipResponderIp = JsonParseUtil.getString(jsonMap, JsonProConfig.SIP_RESPONDER_IP);
int sipResponderPort = JsonParseUtil.getInteger(jsonMap, JsonProConfig.SIP_RESPONDER_PORT);
if (relationUtils.isInnerIp(sipOriginatorIp) || relationUtils.isInnerIp(sipResponderIp)) {
if (RelationUtils.isInnerIp(sipOriginatorIp) || RelationUtils.isInnerIp(sipResponderIp)) {
/*
* 按from-ip_from-port_to-ip_to-port
*/
@@ -176,7 +171,7 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tup
+ sipResponderIp + VoipRelationConfig.CORRELATION_STR
+ sipResponderPort;
//包含内网IP的SIP关联后数据
out.collect(new Tuple3<>(sipInnerEmitKey, "sip-in", JsonMapper.toJsonString(object)));
out.collect(new Tuple4<>(sipInnerEmitKey, "sip-in", jsonMap, vsysid));
} else {
String sipIpPort4Key = getFourKey(sipOriginatorIp,
sipOriginatorPort,
@@ -184,7 +179,7 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tup
sipResponderPort);
//按照四元组的Key发送到下一个bolt
out.collect(new Tuple3<>(sipIpPort4Key, "sip-two", JsonMapper.toJsonString(object)));
out.collect(new Tuple4<>(sipIpPort4Key, "sip-double", jsonMap, vsysid));
}
}
@@ -199,7 +194,7 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tup
*/
private static String getFourKey(String commonClientIp, int commonClientPort, String commonServerIp, int commonServerPort) {
String ipPort4Key = "";
int comparePortResult = compareNum(commonClientPort, commonServerPort);
int comparePortResult = RelationUtils.compareNum(commonClientPort, commonServerPort);
/*
* 按端口比较
@@ -247,7 +242,7 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tup
private static String compareQuadruple(String clientIp, String serverIp, int clientPort, int serverPort) {
long clientIpNum = IPUtil.getIpHostDesimal(clientIp);
long serverIpNum = IPUtil.getIpHostDesimal(serverIp);
int compareIpResult = compareNum(clientIpNum, serverIpNum);
int compareIpResult = RelationUtils.compareNum(clientIpNum, serverIpNum);
switch (compareIpResult) {
//clientIpNum > serverIpNum
case 1:
@@ -284,57 +279,27 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction<String, Tup
*/
private static void accumulateMsg(Map<String, Object> firstSipOrRtpLog, Map<String, Object> secondSipOrRtpLog, Map<String, Object> sipOrRtpCombin) {
//common_sessions
relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.SESSIONS);
RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.SESSIONS);
//common_c2s_pkt_num
relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_PKT_NUM);
RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_PKT_NUM);
//common_s2c_pkt_num
relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_PKT_NUM);
RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_PKT_NUM);
//common_c2s_byte_num
relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_BYTE_NUM);
RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_BYTE_NUM);
//common_s2c_byte_num
relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_BYTE_NUM);
RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_BYTE_NUM);
//common_c2s_ipfrag_num
relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_IPFRAG_NUM);
RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_IPFRAG_NUM);
//common_s2c_ipfrag_num
relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_IPFRAG_NUM);
RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_IPFRAG_NUM);
//common_c2s_tcp_lostlen
relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_TCP_LOSTLEN);
RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_TCP_LOSTLEN);
//common_s2c_tcp_lostlen
relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_TCP_LOSTLEN);
RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_TCP_LOSTLEN);
//common_c2s_tcp_unorder_num
relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_TCP_UNORDER_NUM);
RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_TCP_UNORDER_NUM);
//common_s2c_tcp_unorder_num
relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_TCP_UNORDER_NUM);
}
/**
* int类型
* 比较数字大小,左边>右边-返回:1,左边<右边-返回:-1,左边=右边-返回:0
*
* @param numOne 数值1
* @param numTwo 数值2
*/
private static int compareNum(int numOne, int numTwo) {
if (numOne > 0 && numTwo > 0) {
return Integer.compare(numOne, numTwo);
} else {
return -2;
}
}
/**
* long类型
* 比较数字大小,左边>右边-返回:1,左边<右边-返回:-1,左边=右边-返回:0
*
* @param numOne 数值1
* @param numTwo 数值2
*/
private static int compareNum(long numOne, long numTwo) {
if (numOne > 0 && numTwo > 0) {
return Long.compare(numOne, numTwo);
} else {
return -2;
}
RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_TCP_UNORDER_NUM);
}
/**

View File

@@ -1,14 +1,16 @@
package com.zdjizhi.utils.functions;
package com.zdjizhi.operator.window;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.JsonProConfig;
import com.zdjizhi.common.VoipRelationConfig;
import com.zdjizhi.tools.utils.RelationUtils;
import com.zdjizhi.tools.json.JsonParseUtil;
import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.StringUtil;
import com.zdjizhi.utils.json.JsonParseUtil;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
@@ -16,11 +18,11 @@ import java.util.*;
/**
* @author qidaijie
* @Package com.zdjizhi.utils.functions
* @Package com.zdjizhi.operator.window
* @Description:
* @date 2021/7/2113:55
* @date 2022/9/617:46
*/
public class SipCalibrationWindowFunction extends ProcessAllWindowFunction<Tuple3<String, String, String>, String, TimeWindow> {
public class VoipCalibrationWindowFunction extends ProcessWindowFunction<Tuple4<String, String, Map<String, Object>, Integer>, String, Tuple2<String, Integer>, TimeWindow> {
private static final Log logger = LogFactory.get();
/**
@@ -30,7 +32,7 @@ public class SipCalibrationWindowFunction extends ProcessAllWindowFunction<Tuple
* 存放数据:rtp-single,rtp-two,sip-two
* 不存放的数据:sip-single与sip-in
*/
private static HashMap<String, LinkedList<String>> combineSRHmList = new HashMap<>(16);
private static HashMap<String, LinkedList<Map<String, Object>>> combineSRHmList = new HashMap<>(32);
/**
* 二次关联用HashMap
@@ -39,31 +41,33 @@ public class SipCalibrationWindowFunction extends ProcessAllWindowFunction<Tuple
* 存放数据:rtp-single,rtp-two,sip-two
* 不存放的数据:sip-single与sip-in
*/
private static HashMap<String, LinkedList<String>> secCombineSRHmList = new HashMap<>(16);
private static HashMap<String, LinkedList<Map<String, Object>>> secCombineSRHmList = new HashMap<>(32);
@Override
public void process(Context context, Iterable<Tuple3<String, String, String>> input, Collector<String> output) throws Exception {
for (Tuple3<String, String, String> tuple : input) {
public void process(Tuple2<String, Integer> key, Context context, Iterable<Tuple4<String, String, Map<String, Object>, Integer>> input, Collector<String> out) throws Exception {
for (Tuple4<String, String, Map<String, Object>, Integer> tuple : input) {
//拼接的四元组
String fourKey = tuple.f0;
//已关联的sip,rtp;未关联的sip,rtp;内网的sip
String type = tuple.f1;
String msg = tuple.f2;
Map<String, Object> jsonMap = tuple.f2;
logger.error("" + fourKey + "----" + type);
switch (type) {
//单向流对准后的SIP
case "sip-two":
case "sip-double":
//单向流对准后的RTP
case "rtp-two":
//对不上的RTP
case "rtp-single":
putKeyAndMsg(msg, fourKey, combineSRHmList);
case "rtp-double":
putKeyAndMsg(jsonMap, fourKey, combineSRHmList);
break;
//单向流的SIP
case "sip-single":
//单向流的RTP
case "rtp-single":
//内网的SIP
case "sip-in":
//违规的日志
case "violation":
output.collect(msg);
out.collect(JsonMapper.toJsonString(jsonMap));
break;
default:
logger.error("type is beyond expectation:" + type);
@@ -72,82 +76,91 @@ public class SipCalibrationWindowFunction extends ProcessAllWindowFunction<Tuple
}
}
//初次关联
tickCombineHmList(combineSRHmList, output);
tickCombineHmList(combineSRHmList, out);
//和缓存中的数据二次关联
tickCombineHmList(secCombineSRHmList, output);
tickCombineHmList(secCombineSRHmList, out);
}
/**
* 存放key并添加对应List
*/
private static void putKeyAndMsg(Map<String, Object> message, String fourStrKey, HashMap<String, LinkedList<Map<String, Object>>> combineSRHmList) {
if (combineSRHmList.containsKey(fourStrKey)) {
LinkedList<Map<String, Object>> tmpList = combineSRHmList.get(fourStrKey);
tmpList.add(message);
combineSRHmList.put(fourStrKey, tmpList);
} else {
LinkedList<Map<String, Object>> tmpList = new LinkedList<>();
tmpList.add(message);
combineSRHmList.put(fourStrKey, tmpList);
}
}
/**
* 定时关联,包括初次关联以及后续二次关联
*
* @param combineHmList
*/
@SuppressWarnings("unchecked")
private void tickCombineHmList(HashMap<String, LinkedList<String>> combineHmList, Collector<String> output) {
private void tickCombineHmList(HashMap<String, LinkedList<Map<String, Object>>> combineHmList, Collector<String> output) {
if (combineHmList.size() > 0) {
long nowTime = System.currentTimeMillis() / 1000;
HashMap<String, LinkedList<String>> tempCombineSRhmList = new HashMap<>(combineHmList);
HashMap<String, LinkedList<Map<String, Object>>> tempCombineSRhmList = new HashMap<>(combineHmList);
combineHmList.clear();
for (String fourStrKey : tempCombineSRhmList.keySet()) {
LinkedList<String> tempList = tempCombineSRhmList.get(fourStrKey);
//包含SIP和RTP
int listSize = tempList.size();
LinkedList<Map<String, Object>> tempCombineSRList = tempCombineSRhmList.get(fourStrKey);
//包含SIP和RTP,集合大于1则可能是sip和rtp多条sip或多条rtp
int listSize = tempCombineSRList.size();
if (listSize > 1) {
List<String> sipBeanArr = new ArrayList<>();
List<String> rtpBeanArr = new ArrayList<>();
List<Map<String, Object>> sipBeanArr = new ArrayList<>();
List<Map<String, Object>> rtpBeanArr = new ArrayList<>();
for (String message : tempList) {
Map<String, Object> tempSipOrRtpLog = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class);
for (Map<String, Object> tempSipOrRtpLog : tempCombineSRList) {
String schemaType = JsonParseUtil.getString(tempSipOrRtpLog, JsonProConfig.SCHEMA_TYPE);
if (JsonProConfig.SIP_MARK.equals(schemaType)) {
sipBeanArr.add(message);
sipBeanArr.add(tempSipOrRtpLog);
} else if (JsonProConfig.RTP_MARK.equals(schemaType)) {
rtpBeanArr.add(message);
rtpBeanArr.add(tempSipOrRtpLog);
}
}
int rtpSize = rtpBeanArr.size();
int sipSize = sipBeanArr.size();
//只允许一对多的情况其余视为异常数据
if (rtpSize == 1 && sipSize >= 1) {
for (String sipMessage : sipBeanArr) {
Map<String, Object> rtpLog = (Map<String, Object>) JsonMapper.fromJsonString(rtpBeanArr.get(0), Map.class);
Map<String, Object> voIpLog = (Map<String, Object>) JsonMapper.fromJsonString(sipMessage, Map.class);
for (Map<String, Object> voIpLog : sipBeanArr) {
Map<String, Object> rtpLog = rtpBeanArr.get(0);
accumulateVoipMsg(voIpLog, rtpLog);
JsonParseUtil.setValue(voIpLog, JsonProConfig.SCHEMA_TYPE, "VoIP");
JsonParseUtil.setValue(voIpLog, JsonProConfig.RTP_ORIGINATOR_DIR, judgeDirection(rtpLog, voIpLog));
//四元组,voip,关联后的数据
output.collect(mergeJson(voIpLog, rtpLog));
}
} else if (sipSize == 1 && rtpSize >= 1) {
for (String rtpMessage : rtpBeanArr) {
Map<String, Object> rtpLog = (Map<String, Object>) JsonMapper.fromJsonString(rtpMessage, Map.class);
Map<String, Object> voIpLog = (Map<String, Object>) JsonMapper.fromJsonString(sipBeanArr.get(0), Map.class);
for (Map<String, Object> rtpLog : rtpBeanArr) {
HashMap<String, Object> voIpLog = new HashMap<>(128);
voIpLog.putAll(sipBeanArr.get(0));
accumulateVoipMsg(voIpLog, rtpLog);
JsonParseUtil.setValue(voIpLog, JsonProConfig.SCHEMA_TYPE, "VoIP");
JsonParseUtil.setValue(voIpLog, JsonProConfig.RTP_ORIGINATOR_DIR, judgeDirection(rtpLog, voIpLog));
//四元组,voip,关联后的数据
output.collect(mergeJson(voIpLog, rtpLog));
}
} else {
logger.warn("RTP-listSize is [" + rtpBeanArr.size() + "];SIP-listSize is [" + sipBeanArr.size() + "] in this tempCombineSRHmList! Not logical");
sendErrorLogToKafka(sipBeanArr, output);
sendErrorLogToKafka(rtpBeanArr, output);
logger.error("RTP-listSize is [" + rtpBeanArr.size() + "];SIP-listSize is [" + sipBeanArr.size() + "] in this tempCombineSRHmList! Not logical");
sendViolationLogs(sipBeanArr, output);
sendViolationLogs(rtpBeanArr, output);
}
} else {
String msg = tempList.get(0);
Map<String, Object> voIpLog = (Map<String, Object>) JsonMapper.fromJsonString(msg, Map.class);
Map<String, Object> voIpLog = tempCombineSRList.get(0);
long commonEndTime = JsonParseUtil.getLong(voIpLog, JsonProConfig.END_TIME);
long intervalTime = nowTime - commonEndTime;
logger.error("VoIP日志时间差值记录" + intervalTime);
if (intervalTime <= VoipRelationConfig.SEC_COMBINE_SR_CACHE_SECS) {
putKeyAndMsg(msg, fourStrKey, secCombineSRHmList);
logger.warn("当前日志时间未超过二次对准时间,进入队列等待再次对准,日志时间差:" + intervalTime);
putKeyAndMsg(voIpLog, fourStrKey, secCombineSRHmList);
} else {
sendDirectlyOneElement(msg, voIpLog, output);
output.collect(JsonMapper.toJsonString(voIpLog));
}
}
}
@@ -162,27 +175,27 @@ public class SipCalibrationWindowFunction extends ProcessAllWindowFunction<Tuple
*/
private void accumulateVoipMsg(Map<String, Object> voIpLog, Map<String, Object> rtpLog) {
//common_sessions
relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.SESSIONS);
RelationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.SESSIONS);
//common_c2s_pkt_num
relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.C2S_PKT_NUM);
RelationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.C2S_PKT_NUM);
//common_s2c_pkt_num
relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.S2C_PKT_NUM);
RelationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.S2C_PKT_NUM);
//common_c2s_byte_num
relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.C2S_BYTE_NUM);
RelationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.C2S_BYTE_NUM);
//common_s2c_byte_num
relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.S2C_BYTE_NUM);
RelationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.S2C_BYTE_NUM);
//common_c2s_ipfrag_num
relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.C2S_IPFRAG_NUM);
RelationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.C2S_IPFRAG_NUM);
//common_s2c_ipfrag_num
relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.S2C_IPFRAG_NUM);
RelationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.S2C_IPFRAG_NUM);
//common_c2s_tcp_lostlen
relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.C2S_TCP_LOSTLEN);
RelationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.C2S_TCP_LOSTLEN);
//common_s2c_tcp_lostlen
relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.S2C_TCP_LOSTLEN);
RelationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.S2C_TCP_LOSTLEN);
//common_c2s_tcp_unorder_num
relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.C2S_TCP_UNORDER_NUM);
RelationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.C2S_TCP_UNORDER_NUM);
//common_s2c_tcp_unorder_num
relationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.S2C_TCP_UNORDER_NUM);
RelationUtils.metricSum(voIpLog, rtpLog, JsonProConfig.S2C_TCP_UNORDER_NUM);
}
/**
@@ -203,51 +216,13 @@ public class SipCalibrationWindowFunction extends ProcessAllWindowFunction<Tuple
}
}
/**
* 存放key并添加对应List
*/
private static void putKeyAndMsg(String message, String fourStrKey, HashMap<String, LinkedList<String>> combineSRHmList) {
if (combineSRHmList.containsKey(fourStrKey)) {
LinkedList<String> tmpList = combineSRHmList.get(fourStrKey);
tmpList.add(message);
combineSRHmList.put(fourStrKey, tmpList);
} else {
LinkedList<String> tmpList = new LinkedList<>();
tmpList.add(message);
combineSRHmList.put(fourStrKey, tmpList);
}
}
/**
* 判断RTP主叫方向-测试
*
* @param rtpLog RTP原始日志
* @param voIpLog 融合后VOIP日志
* @return 方向 0未知 1c2s 2s2c
*/
private static int judgeDirection(Map<String, Object> rtpLog, Map<String, Object> voIpLog) {
String ip = JsonParseUtil.getString(rtpLog, JsonProConfig.CLIENT_IP);
String sipOriginatorIp = JsonParseUtil.getString(voIpLog, JsonProConfig.SIP_ORIGINATOR_IP);
String sipResponderIp = JsonParseUtil.getString(voIpLog, JsonProConfig.SIP_RESPONDER_IP);
if (StringUtil.isNotBlank(ip) && ip.equals(sipOriginatorIp)) {
return 1;
} else if (StringUtil.isNotBlank(ip) && ip.equals(sipResponderIp)) {
return 2;
}
return 0;
}
/**
* 发送不符合逻辑的日志到kafka
*/
private static void sendErrorLogToKafka(List<String> logList, Collector<String> output) {
if (logList.size() > 0) {
for (String log : logList) {
output.collect(log);
private static void sendViolationLogs(List<Map<String, Object>> violationLogs, Collector<String> output) {
if (violationLogs.size() > 0) {
for (Map<String, Object> log : violationLogs) {
output.collect(JsonMapper.toJsonString(log));
}
}
}
@@ -261,6 +236,8 @@ public class SipCalibrationWindowFunction extends ProcessAllWindowFunction<Tuple
int rtpPayloadTypeS2c = JsonParseUtil.getInteger(rtpLog, JsonProConfig.RTP_PAYLOAD_TYPE_S2C);
String rtpPcapPath = JsonParseUtil.getString(rtpLog, JsonProConfig.RTP_PCAP_PATH);
JsonParseUtil.setValue(voIpLog, JsonProConfig.SCHEMA_TYPE, "VoIP");
JsonParseUtil.setValue(voIpLog, JsonProConfig.RTP_ORIGINATOR_DIR, RelationUtils.judgeDirection(rtpLog, voIpLog));
JsonParseUtil.setValue(voIpLog, JsonProConfig.RTP_PAYLOAD_TYPE_C2S, rtpPayloadTypeC2s);
JsonParseUtil.setValue(voIpLog, JsonProConfig.RTP_PAYLOAD_TYPE_S2C, rtpPayloadTypeS2c);
JsonParseUtil.setValue(voIpLog, JsonProConfig.RTP_PCAP_PATH, rtpPcapPath);

View File

@@ -1,4 +1,4 @@
package com.zdjizhi.utils.exception;
package com.zdjizhi.tools.exception;
/**
* @author qidaijie

View File

@@ -1,4 +1,4 @@
package com.zdjizhi.utils.json;
package com.zdjizhi.tools.json;
import cn.hutool.log.Log;
@@ -160,29 +160,6 @@ public class JsonParseUtil {
return tmpMap;
}
/**
* 获取属性值的方法
*
* @param obj 对象
* @param property key
* @return 属性的值
*/
public static Object getValue(Object obj, String property) {
try {
BeanMap beanMap = BeanMap.create(obj);
if (beanMap.containsKey(property)) {
return beanMap.get(property);
} else {
return null;
}
} catch (RuntimeException e) {
logger.error("获取json-value异常异常key" + property + "异常信息为:" + e);
return null;
}
}
/**
* 获取属性值的方法
*
@@ -215,22 +192,6 @@ public class JsonParseUtil {
return longVal;
}
/**
* long 类型检验转换方法,若为空返回基础值
*
* @return Long value
*/
public static Long getLong(Object jsonMap, String property) {
Object value = getValue(jsonMap, property);
Long longVal = TypeUtils.castToLong(value);
if (longVal == null) {
return 0L;
}
return longVal;
}
/**
* int 类型检验转换方法,若为空返回基础值
*
@@ -243,21 +204,7 @@ public class JsonParseUtil {
if (intVal == null) {
return 0;
}
return intVal;
}
/**
* int 类型检验转换方法,若为空返回基础值
*
* @return int value
*/
public static int getInteger(Object jsonMap, String property) {
Object value = getValue(jsonMap, property);
Integer intVal = TypeUtils.castToInt(value);
if (intVal == null) {
return 0;
}
return intVal;
}
@@ -278,34 +225,10 @@ public class JsonParseUtil {
return value.toString();
}
/**
* Object 方式获取getString类型字段
*
* @param jsonObject
* @param property
* @return
*/
public static String getString(Object jsonObject, String property) {
Object value = getValue(jsonObject, property);
if (value == null) {
return null;
}
if (value instanceof Map) {
return JsonMapper.toJsonString(value);
}
if (value instanceof List) {
return JsonMapper.toJsonString(value);
}
return value.toString();
}
/**
* 更新属性值的方法
*
* @param jsonMap 原始日志json map
* @param jsonMap 原始日志json parse
* @param property 更新的key
* @param value 更新的值
*/
@@ -317,38 +240,6 @@ public class JsonParseUtil {
}
}
/**
* 更新属性值的方法
*
* @param obj 对象
* @param property 更新的key
* @param value 更新的值
*/
public static void setValue(Object obj, String property, Object value) {
try {
BeanMap beanMap = BeanMap.create(obj);
beanMap.put(property, value);
} catch (ClassCastException e) {
logger.error("赋予实体类错误类型数据", e);
}
}
/**
* 根据反射生成对象的方法
*
* @param properties 反射类用的map
* @return 生成的Object类型的对象
*/
public static Object generateObject(Map properties) {
BeanGenerator generator = new BeanGenerator();
Set keySet = properties.keySet();
for (Object aKeySet : keySet) {
String key = (String) aKeySet;
generator.addProperty(key, (Class) properties.get(key));
}
return generator.create();
}
/**
* 通过获取String类型的网关schema链接来获取map用于生成一个Object类型的对象
* <p>

View File

@@ -1,9 +1,9 @@
package com.zdjizhi.utils.json;
package com.zdjizhi.tools.json;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.exception.VoipRelationException;
import com.zdjizhi.tools.exception.VoipRelationException;
import java.util.List;
import java.util.Map;
@@ -53,7 +53,7 @@ class JsonTypeUtil {
return (Map) value;
}
throw new VoipRelationException("can not cast to map, value : " + value);
throw new VoipRelationException("can not cast to parse, value : " + value);
}
/**

View File

@@ -1,10 +1,10 @@
package com.zdjizhi.utils.json;
package com.zdjizhi.tools.json;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.VoipRelationConfig;
import com.zdjizhi.utils.StringUtil;
import com.zdjizhi.utils.exception.VoipRelationException;
import com.zdjizhi.tools.exception.VoipRelationException;
/**
@@ -39,14 +39,6 @@ public class TypeUtils {
return ((Number) value).longValue();
}
// if (value instanceof Map) {
// return (Map) value;
// }
//
// if (value instanceof List) {
// return Collections.singletonList(value.toString());
// }
if (value instanceof Boolean) {
return (Boolean) value ? 1 : 0;
}
@@ -70,11 +62,6 @@ public class TypeUtils {
return (Integer) value;
}
//此判断数值超范围不抛出异常会截取成对应类型数值
// if (value instanceof Number) {
// return ((Number) value).intValue();
// }
if (value instanceof String) {
String strVal = (String) value;
if (StringUtil.isBlank(strVal)) {
@@ -112,11 +99,6 @@ public class TypeUtils {
return (Double) value;
}
//此判断数值超范围不抛出异常会截取成对应类型数值
// if (value instanceof Number) {
// return ((Number) value).doubleValue();
// }
if (value instanceof String) {
String strVal = (String) value;

View File

@@ -1,4 +1,4 @@
package com.zdjizhi.utils.kafka;
package com.zdjizhi.tools.kafka;
import com.zdjizhi.common.VoipRelationConfig;
import org.apache.kafka.common.config.SslConfigs;

View File

@@ -1,9 +1,8 @@
package com.zdjizhi.utils.kafka;
package com.zdjizhi.tools.kafka;
import com.zdjizhi.common.VoipRelationConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;

View File

@@ -1,4 +1,4 @@
package com.zdjizhi.utils.kafka;
package com.zdjizhi.tools.kafka;
import com.zdjizhi.common.VoipRelationConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
@@ -42,9 +42,6 @@ public class KafkaProducer {
//启用此选项将使生产者仅记录失败日志而不是捕获和重新抛出它们
kafkaProducer.setLogFailuresOnly(true);
//写入kafka的消息携带时间戳
// kafkaProducer.setWriteTimestampToKafka(true);
return kafkaProducer;
}
}

View File

@@ -1,7 +1,7 @@
package com.zdjizhi.utils.system;
package com.zdjizhi.tools.system;
import com.zdjizhi.utils.StringUtil;
import com.zdjizhi.utils.exception.VoipRelationException;
import com.zdjizhi.tools.exception.VoipRelationException;
import java.io.IOException;
import java.util.Locale;

View File

@@ -0,0 +1,134 @@
package com.zdjizhi.tools.utils;
import com.zdjizhi.common.JsonProConfig;
import com.zdjizhi.common.VoipRelationConfig;
import com.zdjizhi.utils.IPUtil;
import com.zdjizhi.utils.StringUtil;
import com.zdjizhi.tools.json.JsonParseUtil;
import java.util.Map;
/**
* @author qidaijie
* @Package com.zdjizhi.operator
* @Description:
* @date 2022/4/1911:30
*/
public class RelationUtils {
/**
* 将A日志内的某指标数据与B日志进行累加计算,并写入A日志中
*
* @param firstLog A日志
* @param secondLog B日志
* @param key 指标 json key
*/
public static void metricSum(Map<String, Object> firstLog, Map<String, Object> secondLog, String key) {
Long firstMetric = JsonParseUtil.getLong(firstLog, key);
Long secondMetric = JsonParseUtil.getLong(secondLog, key);
Long sum = firstMetric + secondMetric;
JsonParseUtil.setValue(firstLog, key, sum);
}
/**
* 将A日志内的某指标数据与B日志进行累加计算,并写入C日志中
*
* @param firstLog A日志
* @param secondLog B日志
* @param otherLog C日志
* @param key 指标 json key
*/
public static void metricSumSetOtherLog(Map<String, Object> firstLog, Map<String, Object> secondLog, Map<String, Object> otherLog, String key) {
Long firstMetric = JsonParseUtil.getLong(firstLog, key);
Long secondMetric = JsonParseUtil.getLong(secondLog, key);
Long sum = firstMetric + secondMetric;
JsonParseUtil.setValue(otherLog, key, sum);
}
/**
* 校验sip日志必须包含协商四元组否则将原样输出不处理
*
* @param sipLog SIP日志
* @return true or false
*/
public static boolean checkSipCompleteness(Map<String, Object> sipLog) {
return sipLog.containsKey(JsonProConfig.SIP_ORIGINATOR_IP) &&
sipLog.containsKey(JsonProConfig.SIP_ORIGINATOR_PORT) &&
sipLog.containsKey(JsonProConfig.SIP_RESPONDER_IP) &&
sipLog.containsKey(JsonProConfig.SIP_RESPONDER_PORT);
}
/**
* 是否为内网IP
*
* @param ip ip Address
* @return true or false
*/
public static boolean isInnerIp(String ip) {
if (VoipRelationConfig.CHECK_INNER_NETWORK == 1) {
//为空或者为特定IP时也算作内网IP
return StringUtil.isBlank(ip) || IPUtil.internalIp(ip);
}
return false;
}
/**
* int类型
* 比较数字大小,左边>右边-返回:1,左边<右边-返回:-1,左边=右边-返回:0
*
* @param numOne 数值1
* @param numTwo 数值2
*/
public static int compareNum(int numOne, int numTwo) {
if (numOne > 0 && numTwo > 0) {
return Integer.compare(numOne, numTwo);
} else {
return -2;
}
}
/**
* long类型
* 比较数字大小,左边>右边-返回:1,左边<右边-返回:-1,左边=右边-返回:0
*
* @param numOne 数值1
* @param numTwo 数值2
*/
public static int compareNum(long numOne, long numTwo) {
if (numOne > 0 && numTwo > 0) {
return Long.compare(numOne, numTwo);
} else {
return -2;
}
}
/**
* 判断RTP主叫方向
*
* @param rtpLog RTP原始日志
* @param voIpLog 融合后VOIP日志
* @return 方向 0未知 1c2s 2s2c
*/
public static int judgeDirection(Map<String, Object> rtpLog, Map<String, Object> voIpLog) {
String ip = JsonParseUtil.getString(rtpLog, JsonProConfig.CLIENT_IP);
String sipOriginatorIp = JsonParseUtil.getString(voIpLog, JsonProConfig.SIP_ORIGINATOR_IP);
String sipResponderIp = JsonParseUtil.getString(voIpLog, JsonProConfig.SIP_RESPONDER_IP);
if (StringUtil.isNotBlank(ip) && ip.equals(sipOriginatorIp)) {
return 1;
} else if (StringUtil.isNotBlank(ip) && ip.equals(sipResponderIp)) {
return 2;
}
return 0;
}
}

View File

@@ -3,16 +3,23 @@ package com.zdjizhi.topology;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.VoipRelationConfig;
import com.zdjizhi.utils.functions.*;
import com.zdjizhi.utils.kafka.KafkaConsumer;
import com.zdjizhi.utils.kafka.KafkaProducer;
import org.apache.flink.api.java.tuple.Tuple3;
import com.zdjizhi.operator.filter.NullFilterFunction;
import com.zdjizhi.operator.group.MergeUniFlowKeyByFunction;
import com.zdjizhi.operator.group.VoipCalibrationKeyByFunction;
import com.zdjizhi.operator.parse.ParseMapFunction;
import com.zdjizhi.operator.window.MergeUniFlowWindowFunction;
import com.zdjizhi.operator.window.VoipCalibrationWindowFunction;
import com.zdjizhi.tools.kafka.KafkaConsumer;
import com.zdjizhi.tools.kafka.KafkaProducer;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.Map;
/**
* @author qidaijie
@@ -28,20 +35,28 @@ public class VoIpRelationTopology {
DataStream<String> streamSource = environment.addSource(KafkaConsumer.getKafkaConsumer());
SingleOutputStreamOperator<Tuple3<String, String, String>> sipCorrelation =
streamSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(VoipRelationConfig.ONE_SIDED_WINDOW_TIME)))
.process(new OneSidedWindowFunction()).name("OneSidedWindow");
SingleOutputStreamOperator<String> window = sipCorrelation.windowAll(
TumblingProcessingTimeWindows.of(Time.seconds(VoipRelationConfig.VOIP_CALIBRATION_WINDOW_TIME)))
.process(new SipCalibrationWindowFunction()).name("SipCalibrationWindowFunction");
SingleOutputStreamOperator<Tuple4<String, String, Map<String, Object>, Integer>> mergeUniFlowWindow = streamSource.filter(new NullFilterFunction())
.map(new ParseMapFunction())
.keyBy(new MergeUniFlowKeyByFunction())
.window(TumblingProcessingTimeWindows.of(Time.seconds(VoipRelationConfig.ONE_SIDED_WINDOW_TIME)))
.process(new MergeUniFlowWindowFunction())
.name("MergeUnidirectionalFlowWindow").setParallelism(VoipRelationConfig.MERGE_UNIFLOW_WINDOW_PARALLELISM);
window.addSink(KafkaProducer.getKafkaProducer()).name("VoIpLogSinkKafka");
SingleOutputStreamOperator<String> mergeUnidirectionalFlowWindow = mergeUniFlowWindow.keyBy(new VoipCalibrationKeyByFunction())
.window(TumblingProcessingTimeWindows.of(Time.seconds(VoipRelationConfig.VOIP_CALIBRATION_WINDOW_TIME)))
.process(new VoipCalibrationWindowFunction())
.name("MergeUnidirectionalFlowWindow").setParallelism(VoipRelationConfig.CALIBRATION_WINDOW_PARALLELISM);
mergeUnidirectionalFlowWindow.addSink(KafkaProducer.getKafkaProducer()).name("VoIpLogSinkKafka");
try {
environment.execute(args[0]);
} catch (Exception e) {
logger.error("This Flink task start ERROR! Exception information is :" + e);
logger.error("This Flink task start ERROR! Exception information is :" + e.getMessage());
e.printStackTrace();
}
}
}

View File

@@ -1,81 +0,0 @@
package com.zdjizhi.utils.functions;
import com.zdjizhi.common.JsonProConfig;
import com.zdjizhi.common.VoipRelationConfig;
import com.zdjizhi.utils.IPUtil;
import com.zdjizhi.utils.StringUtil;
import com.zdjizhi.utils.json.JsonParseUtil;
import java.util.Map;
/**
* @author qidaijie
* @Package com.zdjizhi.utils.functions
* @Description:
* @date 2022/4/1911:30
*/
class relationUtils {
/**
* 将A日志内的某指标数据与B日志进行累加计算,并写入A日志中
*
* @param firstLog A日志
* @param secondLog B日志
* @param key 指标 json key
*/
static void metricSum(Map<String, Object> firstLog, Map<String, Object> secondLog, String key) {
Long firstMetric = JsonParseUtil.getLong(firstLog, key);
Long secondMetric = JsonParseUtil.getLong(secondLog, key);
Long sum = firstMetric + secondMetric;
JsonParseUtil.setValue(firstLog, key, sum);
}
/**
* 将A日志内的某指标数据与B日志进行累加计算,并写入C日志中
*
* @param firstLog A日志
* @param secondLog B日志
* @param otherLog C日志
* @param key 指标 json key
*/
static void metricSumSetOtherLog(Map<String, Object> firstLog, Map<String, Object> secondLog, Map<String, Object> otherLog, String key) {
Long firstMetric = JsonParseUtil.getLong(firstLog, key);
Long secondMetric = JsonParseUtil.getLong(secondLog, key);
Long sum = firstMetric + secondMetric;
JsonParseUtil.setValue(otherLog, key, sum);
}
/**
* 校验sip日志必须包含协商四元组否则将原样输出不处理
*
* @param sipLog SIP日志
* @return true or false
*/
static boolean checkSipCompleteness(Map<String, Object> sipLog) {
return sipLog.containsKey(JsonProConfig.SIP_ORIGINATOR_IP) &&
sipLog.containsKey(JsonProConfig.SIP_ORIGINATOR_PORT) &&
sipLog.containsKey(JsonProConfig.SIP_RESPONDER_IP) &&
sipLog.containsKey(JsonProConfig.SIP_RESPONDER_PORT);
}
/**
* 是否为内网IP
*
* @param ip ip Address
* @return true or false
*/
static boolean isInnerIp(String ip) {
if (VoipRelationConfig.CHECK_INNER_NETWORK == 1) {
//为空或者为特定IP时也算作内网IP
return StringUtil.isBlank(ip) || IPUtil.internalIp(ip);
}
return false;
}
}

View File

@@ -1,77 +0,0 @@
package com.zdjizhi.utils.http;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
/**
* 获取网关schema的工具类
*
* @author qidaijie
*/
public class HttpClientUtil {
private static final Log logger = LogFactory.get();
/**
* 请求网关获取schema
*
* @param http 网关url
* @return schema
*/
public static String requestByGetMethod(String http) {
CloseableHttpClient httpClient = HttpClients.createDefault();
StringBuilder entityStringBuilder;
HttpGet get = new HttpGet(http);
BufferedReader bufferedReader = null;
CloseableHttpResponse httpResponse = null;
try {
httpResponse = httpClient.execute(get);
HttpEntity entity = httpResponse.getEntity();
entityStringBuilder = new StringBuilder();
if (null != entity) {
bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024);
int intC;
while ((intC = bufferedReader.read()) != -1) {
char c = (char) intC;
if (c == '\n') {
break;
}
entityStringBuilder.append(c);
}
return entityStringBuilder.toString();
}
} catch (IOException e) {
logger.error("Get Schema from Query engine ERROR! Exception message is:" + e);
} finally {
if (httpClient != null) {
try {
httpClient.close();
} catch (IOException e) {
logger.error("Close HTTP Client ERROR! Exception messgae is:" + e);
}
}
if (httpResponse != null) {
try {
httpResponse.close();
} catch (IOException e) {
logger.error("Close httpResponse ERROR! Exception messgae is:" + e);
}
}
if (bufferedReader != null) {
IOUtils.closeQuietly(bufferedReader);
}
}
return "";
}
}

3
src/test/testdata/differentVSYS vendored Normal file
View File

@@ -0,0 +1,3 @@
{"common_action":0,"common_address_list":"50730-20730-192.168.36.136-192.168.56.27","common_address_type":4,"common_app_behavior":"voice_call","common_app_id":{"UNKNOWN":[{"app_id":4,"app_name":"unknown","packet_sequence":22,"surrogate_id":0}]},"common_app_identify_info":[{"app_name":"unknown","packet_sequence":22}],"common_app_label":"unknown","common_c2s_byte_num":64860,"common_c2s_pkt_num":304,"common_client_ip":"192.168.36.136","common_client_port":50730,"common_con_duration_ms":6264,"common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_direction":73,"common_end_time":1663123136,"common_l4_protocol":"IPv4_UDP","common_l7_protocol":"STUN.RTP","common_policy_id":0,"common_s2c_byte_num":66144,"common_s2c_pkt_num":310,"common_schema_type":"RTP","common_server_ip":"192.168.56.27","common_server_port":20730,"common_service":4,"common_sled_ip":"192.168.40.81","common_start_time":1663123136,"common_stream_dir":3,"common_stream_trace_id":"869477723735406984","rtp_pcap_path":"http://192.168.44.67:9098/hos/rtp_hos_bucket/192.168.56.27_192.168.36.136_20730_50730_123.pcap","raw_log_status":"CLOSE","common_vsys_id":2}
{"common_action":0,"common_address_list":"50730-20730-192.168.36.136-192.168.56.27","common_address_type":4,"common_app_behavior":"voice_call","common_app_id":{"UNKNOWN":[{"app_id":4,"app_name":"unknown","packet_sequence":22,"surrogate_id":0}]},"common_app_identify_info":[{"app_name":"unknown","packet_sequence":22}],"common_app_label":"unknown","common_c2s_byte_num":64860,"common_c2s_pkt_num":304,"common_client_ip":"192.168.36.136","common_client_port":50730,"common_con_duration_ms":6264,"common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_direction":73,"common_end_time":1663123136,"common_l4_protocol":"IPv4_UDP","common_l7_protocol":"STUN.RTP","common_policy_id":0,"common_s2c_byte_num":66144,"common_s2c_pkt_num":310,"common_schema_type":"RTP","common_server_ip":"192.168.56.27","common_server_port":20730,"common_service":4,"common_sled_ip":"192.168.40.81","common_start_time":1663123136,"common_stream_dir":3,"common_stream_trace_id":"869477723735406984","rtp_pcap_path":"http://192.168.44.67:9098/hos/rtp_hos_bucket/192.168.56.27_192.168.36.136_20730_50730_123.pcap","raw_log_status":"CLOSE","common_vsys_id":3}
{"common_schema_type":"SIP","common_sessions":1,"sip_call_id":"NDNjYmJkMDJhNGJmYmUyN2EwZjdmYzNjMWE1YTRmYjc.","sip_originator_description":"\"test1\"<sip:test1@192.168.40.158>","sip_responder_description":"\"test2\"<sip:test2@192.168.40.158>","sip_user_agent":"eyeBeamrelease1011dstamp40820","sip_originator_sdp_content":"v=0\r\no=-52INIP4192.168.56.27\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.56.27\r\nt=00\r\nm=audio20730RTP/AVP0818101\r\na=alt:11:UmXquXbwvg/CQxTA192.168.56.2720730\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:F934D61F09C84591BB3ECEECE5993BB8\r\n","sip_originator_sdp_connect_ip":"192.168.56.27","sip_originator_sdp_media_port":20730,"sip_originator_sdp_media_type":"18G729/8000","sip_server":"OpenSIPS(2.4.9(x86_64/linux))","sip_responder_sdp_content":"v=0\r\no=-62INIP4192.168.36.136\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.36.136\r\nt=00\r\nm=audio50730RTP/AVP0818101\r\na=alt:11:sXlLExDmMtTnircb192.168.36.13650730\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:4342C45E30624AA085D078ABE24D2D33\r\n","sip_responder_sdp_connect_ip":"192.168.36.136","sip_responder_sdp_media_port":50730,"sip_responder_sdp_media_type":"18G729/8000","sip_duration_s":7,"sip_bye":"responder","common_protocol_label":"ETHERNET.IPv4.UDP","common_c2s_ipfrag_num":0,"common_s2c_ipfrag_num":0,"common_direction":69,"common_l7_protocol":"SIP","common_server_ip":"192.168.40.158","common_client_ip":"192.168.56.27","common_server_port":5060,"common_client_port":32196,"common_stream_dir":3,"common_address_type":4,"common_address_list":"32196-5060-192.168.56.27-192.168.40.158","common_start_time":1663123136,"common_end_time":1663123136,"common_con_duration_ms":17043,"common_s2c_pkt_num":9,"common_s2c_byte_num":5375,"common_c2s_pkt_num":9,"common_c2s_byte_num":6563,"common_stream_trace_id":"869477723735079299","common_l4_protocol":"IPv4_UDP","common_sled_ip":"192.168.40.81","common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_policy_id":0,"common_service":5,"common_action":0,"common_vsys_id":2}

View File

@@ -0,0 +1,3 @@
{"common_action":0,"common_address_list":"50730-20730-192.168.36.136-192.168.56.27","common_address_type":4,"common_app_behavior":"voice_call","common_app_id":{"UNKNOWN":[{"app_id":4,"app_name":"unknown","packet_sequence":22,"surrogate_id":0}]},"common_app_identify_info":[{"app_name":"unknown","packet_sequence":22}],"common_app_label":"unknown","common_c2s_byte_num":64860,"common_c2s_pkt_num":304,"common_client_ip":"192.168.36.136","common_client_port":50730,"common_con_duration_ms":6264,"common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_direction":73,"common_end_time":1663123136,"common_l4_protocol":"IPv4_UDP","common_l7_protocol":"STUN.RTP","common_policy_id":0,"common_s2c_byte_num":66144,"common_s2c_pkt_num":310,"common_schema_type":"RTP","common_server_ip":"192.168.56.27","common_server_port":20730,"common_service":4,"common_sled_ip":"192.168.40.81","common_start_time":1663123136,"common_stream_dir":1,"common_stream_trace_id":"869477723735406984","rtp_pcap_path":"http://192.168.44.67:9098/hos/rtp_hos_bucket/192.168.56.27_192.168.36.136_20730_50730_123.pcap","raw_log_status":"CLOSE","common_vsys_id":2}
{"common_action":0,"common_address_list":"50730-20730-192.168.36.136-192.168.56.27","common_address_type":4,"common_app_behavior":"voice_call","common_app_id":{"UNKNOWN":[{"app_id":4,"app_name":"unknown","packet_sequence":22,"surrogate_id":0}]},"common_app_identify_info":[{"app_name":"unknown","packet_sequence":22}],"common_app_label":"unknown","common_c2s_byte_num":64860,"common_c2s_pkt_num":304,"common_client_ip":"192.168.36.136","common_client_port":50730,"common_con_duration_ms":6264,"common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_direction":73,"common_end_time":1663123136,"common_l4_protocol":"IPv4_UDP","common_l7_protocol":"STUN.RTP","common_policy_id":0,"common_s2c_byte_num":66144,"common_s2c_pkt_num":310,"common_schema_type":"RTP","common_server_ip":"192.168.56.27","common_server_port":20730,"common_service":4,"common_sled_ip":"192.168.40.81","common_start_time":1663123136,"common_stream_dir":2,"common_stream_trace_id":"869477723735406984","rtp_pcap_path":"http://192.168.44.67:9098/hos/rtp_hos_bucket/192.168.56.27_192.168.36.136_20730_50730_456.pcap","raw_log_status":"CLOSE","common_vsys_id":2}
{"common_schema_type":"SIP","common_sessions":1,"sip_call_id":"NDNjYmJkMDJhNGJmYmUyN2EwZjdmYzNjMWE1YTRmYjc.","sip_originator_description":"\"test1\"<sip:test1@192.168.40.158>","sip_responder_description":"\"test2\"<sip:test2@192.168.40.158>","sip_user_agent":"eyeBeamrelease1011dstamp40820","sip_originator_sdp_content":"v=0\r\no=-52INIP4192.168.56.27\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.56.27\r\nt=00\r\nm=audio20730RTP/AVP0818101\r\na=alt:11:UmXquXbwvg/CQxTA192.168.56.2720730\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:F934D61F09C84591BB3ECEECE5993BB8\r\n","sip_originator_sdp_connect_ip":"192.168.56.27","sip_originator_sdp_media_port":20730,"sip_originator_sdp_media_type":"18G729/8000","sip_server":"OpenSIPS(2.4.9(x86_64/linux))","sip_responder_sdp_content":"v=0\r\no=-62INIP4192.168.36.136\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.36.136\r\nt=00\r\nm=audio50730RTP/AVP0818101\r\na=alt:11:sXlLExDmMtTnircb192.168.36.13650730\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:4342C45E30624AA085D078ABE24D2D33\r\n","sip_responder_sdp_connect_ip":"192.168.36.136","sip_responder_sdp_media_port":50730,"sip_responder_sdp_media_type":"18G729/8000","sip_duration_s":7,"sip_bye":"responder","common_protocol_label":"ETHERNET.IPv4.UDP","common_c2s_ipfrag_num":0,"common_s2c_ipfrag_num":0,"common_direction":69,"common_l7_protocol":"SIP","common_server_ip":"192.168.40.158","common_client_ip":"192.168.56.27","common_server_port":5060,"common_client_port":32196,"common_stream_dir":3,"common_address_type":4,"common_address_list":"32196-5060-192.168.56.27-192.168.40.158","common_start_time":1663123136,"common_end_time":1663123136,"common_con_duration_ms":17043,"common_s2c_pkt_num":9,"common_s2c_byte_num":5375,"common_c2s_pkt_num":9,"common_c2s_byte_num":6563,"common_stream_trace_id":"869477723735079299","common_l4_protocol":"IPv4_UDP","common_sled_ip":"192.168.40.81","common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_policy_id":0,"common_service":5,"common_action":0,"common_vsys_id":2}

3
src/test/testdata/oneRTPUniFlowSIP vendored Normal file
View File

@@ -0,0 +1,3 @@
{"common_action":0,"common_address_list":"50732-20730-192.168.36.136-192.168.56.27","common_address_type":4,"common_app_behavior":"voice_call","common_app_id":{"UNKNOWN":[{"app_id":4,"app_name":"unknown","packet_sequence":22,"surrogate_id":0}]},"common_app_identify_info":[{"app_name":"unknown","packet_sequence":22}],"common_app_label":"unknown","common_c2s_byte_num":64860,"common_c2s_pkt_num":304,"common_client_ip":"192.168.36.136","common_client_port":50732,"common_con_duration_ms":6264,"common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_direction":73,"common_end_time":1663123136,"common_l4_protocol":"IPv4_UDP","common_l7_protocol":"STUN.RTP","common_policy_id":0,"common_s2c_byte_num":66144,"common_s2c_pkt_num":310,"common_schema_type":"RTP","common_server_ip":"192.168.56.27","common_server_port":20730,"common_service":4,"common_sled_ip":"192.168.40.81","common_start_time":1663123136,"common_stream_dir":3,"common_stream_trace_id":"869477723735406984","rtp_pcap_path":"http://192.168.44.67:9098/hos/rtp_hos_bucket/192.168.56.27_192.168.36.136_20730_50732_123.pcap","raw_log_status":"CLOSE"}
{"common_schema_type":"SIP","common_sessions":1,"sip_call_id":"NDNjYmJkMDJhNGJmYmUyN2EwZjdmYzNjMWE1YTRmYjc.","sip_originator_description":"\"test1\"<sip:test1@192.168.40.158>","sip_responder_description":"\"test2\"<sip:test2@192.168.40.158>","sip_user_agent":"eyeBeamrelease1011dstamp40820","sip_originator_sdp_content":"v=0\r\no=-52INIP4192.168.56.27\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.56.27\r\nt=00\r\nm=audio20730RTP/AVP0818101\r\na=alt:11:UmXquXbwvg/CQxTA192.168.56.2720730\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:F934D61F09C84591BB3ECEECE5993BB8\r\n","sip_originator_sdp_connect_ip":"192.168.56.27","sip_originator_sdp_media_port":20730,"sip_originator_sdp_media_type":"18G729/8000","sip_server":"OpenSIPS(2.4.9(x86_64/linux))","sip_responder_sdp_content":"v=0\r\no=-62INIP4192.168.36.136\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.36.136\r\nt=00\r\nm=audio50732RTP/AVP0818101\r\na=alt:11:sXlLExDmMtTnircb192.168.36.13650732\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:4342C45E30624AA085D078ABE24D2D33\r\n","sip_responder_sdp_connect_ip":"192.168.36.136","sip_responder_sdp_media_port":50732,"sip_responder_sdp_media_type":"18G729/8000","sip_duration_s":7,"sip_bye":"responder","common_protocol_label":"ETHERNET.IPv4.UDP","common_c2s_ipfrag_num":0,"common_s2c_ipfrag_num":0,"common_direction":69,"common_l7_protocol":"SIP","common_server_ip":"192.168.40.158","common_client_ip":"192.168.56.27","common_server_port":5060,"common_client_port":32196,"common_stream_dir":1,"common_address_type":4,"common_address_list":"32196-5060-192.168.56.27-192.168.40.158","common_start_time":1663123136,"common_end_time":1663123136,"common_con_duration_ms":17043,"common_s2c_pkt_num":9,"common_s2c_byte_num":5375,"common_c2s_pkt_num":9,"common_c2s_byte_num":6563,"common_stream_trace_id":"869477723735079299","common_l4_protocol":"IPv4_UDP","common_sled_ip":"192.168.40.81","common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_policy_id":0,"common_service":5,"common_action":0}
{"common_schema_type":"SIP","common_sessions":1,"sip_call_id":"NDNjYmJkMDJhNGJmYmUyN2EwZjdmYzNjMWE1YTRmYjc.","sip_originator_description":"\"test1\"<sip:test1@192.168.40.158>","sip_responder_description":"\"test2\"<sip:test2@192.168.40.158>","sip_user_agent":"eyeBeamrelease1011dstamp40820","sip_originator_sdp_content":"v=0\r\no=-52INIP4192.168.56.27\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.56.27\r\nt=00\r\nm=audio20730RTP/AVP0818101\r\na=alt:11:UmXquXbwvg/CQxTA192.168.56.2720730\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:F934D61F09C84591BB3ECEECE5993BB8\r\n","sip_originator_sdp_connect_ip":"192.168.56.27","sip_originator_sdp_media_port":20730,"sip_originator_sdp_media_type":"18G729/8000","sip_server":"OpenSIPS(2.4.9(x86_64/linux))","sip_responder_sdp_content":"v=0\r\no=-62INIP4192.168.36.136\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.36.136\r\nt=00\r\nm=audio50732RTP/AVP0818101\r\na=alt:11:sXlLExDmMtTnircb192.168.36.13650732\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:4342C45E30624AA085D078ABE24D2D33\r\n","sip_responder_sdp_connect_ip":"192.168.36.136","sip_responder_sdp_media_port":50732,"sip_responder_sdp_media_type":"18G729/8000","sip_duration_s":7,"sip_bye":"responder","common_protocol_label":"ETHERNET.IPv4.UDP","common_c2s_ipfrag_num":0,"common_s2c_ipfrag_num":0,"common_direction":69,"common_l7_protocol":"SIP","common_server_ip":"192.168.40.158","common_client_ip":"192.168.56.27","common_server_port":5060,"common_client_port":32196,"common_stream_dir":2,"common_address_type":4,"common_address_list":"32196-5060-192.168.56.27-192.168.40.158","common_start_time":1663123136,"common_end_time":1663123136,"common_con_duration_ms":17043,"common_s2c_pkt_num":9,"common_s2c_byte_num":5375,"common_c2s_pkt_num":9,"common_c2s_byte_num":6563,"common_stream_trace_id":"8694777237350350","common_l4_protocol":"IPv4_UDP","common_sled_ip":"192.168.40.81","common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_policy_id":0,"common_service":5,"common_action":0}

3
src/test/testdata/oneRTPtwoSIP vendored Normal file
View File

@@ -0,0 +1,3 @@
{"common_action":0,"common_address_list":"50731-20730-192.168.36.136-192.168.56.27","common_address_type":4,"common_app_behavior":"voice_call","common_app_id":{"UNKNOWN":[{"app_id":4,"app_name":"unknown","packet_sequence":22,"surrogate_id":0}]},"common_app_identify_info":[{"app_name":"unknown","packet_sequence":22}],"common_app_label":"unknown","common_c2s_byte_num":64860,"common_c2s_pkt_num":304,"common_client_ip":"192.168.36.136","common_client_port":50731,"common_con_duration_ms":6264,"common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_direction":73,"common_end_time":1663123136,"common_l4_protocol":"IPv4_UDP","common_l7_protocol":"STUN.RTP","common_policy_id":0,"common_s2c_byte_num":66144,"common_s2c_pkt_num":310,"common_schema_type":"RTP","common_server_ip":"192.168.56.27","common_server_port":20730,"common_service":4,"common_sled_ip":"192.168.40.81","common_start_time":1663123136,"common_stream_dir":3,"common_stream_trace_id":"869477723735406984","rtp_pcap_path":"http://192.168.44.67:9098/hos/rtp_hos_bucket/192.168.56.27_192.168.36.136_20730_50731_123.pcap","raw_log_status":"CLOSE"}
{"common_schema_type":"SIP","common_sessions":1,"sip_call_id":"NDNjYmJkMDJhNGJmYmUyN2EwZjdmYzNjMWE1YTRmYjc.","sip_originator_description":"\"test1\"<sip:test1@192.168.40.158>","sip_responder_description":"\"test2\"<sip:test2@192.168.40.158>","sip_user_agent":"eyeBeamrelease1011dstamp40820","sip_originator_sdp_content":"v=0\r\no=-52INIP4192.168.56.27\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.56.27\r\nt=00\r\nm=audio20730RTP/AVP0818101\r\na=alt:11:UmXquXbwvg/CQxTA192.168.56.2720730\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:F934D61F09C84591BB3ECEECE5993BB8\r\n","sip_originator_sdp_connect_ip":"192.168.56.27","sip_originator_sdp_media_port":20730,"sip_originator_sdp_media_type":"18G729/8000","sip_server":"OpenSIPS(2.4.9(x86_64/linux))","sip_responder_sdp_content":"v=0\r\no=-62INIP4192.168.36.136\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.36.136\r\nt=00\r\nm=audio50731RTP/AVP0818101\r\na=alt:11:sXlLExDmMtTnircb192.168.36.13650731\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:4342C45E30624AA085D078ABE24D2D33\r\n","sip_responder_sdp_connect_ip":"192.168.36.136","sip_responder_sdp_media_port":50731,"sip_responder_sdp_media_type":"18G729/8000","sip_duration_s":7,"sip_bye":"responder","common_protocol_label":"ETHERNET.IPv4.UDP","common_c2s_ipfrag_num":0,"common_s2c_ipfrag_num":0,"common_direction":69,"common_l7_protocol":"SIP","common_server_ip":"192.168.40.158","common_client_ip":"192.168.56.27","common_server_port":5060,"common_client_port":32196,"common_stream_dir":3,"common_address_type":4,"common_address_list":"32196-5060-192.168.56.27-192.168.40.158","common_start_time":1663123136,"common_end_time":1663123136,"common_con_duration_ms":17043,"common_s2c_pkt_num":9,"common_s2c_byte_num":5375,"common_c2s_pkt_num":9,"common_c2s_byte_num":6563,"common_stream_trace_id":"8694777237350350","common_l4_protocol":"IPv4_UDP","common_sled_ip":"192.168.40.81","common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_policy_id":0,"common_service":5,"common_action":0}
{"common_schema_type":"SIP","common_sessions":1,"sip_call_id":"NDNjYmJkMDJhNGJmYmUyN2EwZjdmYzNjMWE1YTRmYjc.","sip_originator_description":"\"test1\"<sip:test1@192.168.40.158>","sip_responder_description":"\"test2\"<sip:test2@192.168.40.158>","sip_user_agent":"eyeBeamrelease1011dstamp40820","sip_originator_sdp_content":"v=0\r\no=-52INIP4192.168.56.27\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.56.27\r\nt=00\r\nm=audio20730RTP/AVP0818101\r\na=alt:11:UmXquXbwvg/CQxTA192.168.56.2720730\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:F934D61F09C84591BB3ECEECE5993BB8\r\n","sip_originator_sdp_connect_ip":"192.168.56.27","sip_originator_sdp_media_port":20730,"sip_originator_sdp_media_type":"18G729/8000","sip_server":"OpenSIPS(2.4.9(x86_64/linux))","sip_responder_sdp_content":"v=0\r\no=-62INIP4192.168.36.136\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.36.136\r\nt=00\r\nm=audio50731RTP/AVP0818101\r\na=alt:11:sXlLExDmMtTnircb192.168.36.13650731\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:4342C45E30624AA085D078ABE24D2D33\r\n","sip_responder_sdp_connect_ip":"192.168.36.136","sip_responder_sdp_media_port":50731,"sip_responder_sdp_media_type":"18G729/8000","sip_duration_s":7,"sip_bye":"responder","common_protocol_label":"ETHERNET.IPv4.UDP","common_c2s_ipfrag_num":0,"common_s2c_ipfrag_num":0,"common_direction":69,"common_l7_protocol":"SIP","common_server_ip":"192.168.40.158","common_client_ip":"192.168.56.27","common_server_port":5060,"common_client_port":32196,"common_stream_dir":3,"common_address_type":4,"common_address_list":"32196-5060-192.168.56.27-192.168.40.158","common_start_time":1663123136,"common_end_time":1663123136,"common_con_duration_ms":17043,"common_s2c_pkt_num":9,"common_s2c_byte_num":5375,"common_c2s_pkt_num":9,"common_c2s_byte_num":6563,"common_stream_trace_id":"8694777237333333","common_l4_protocol":"IPv4_UDP","common_sled_ip":"192.168.40.81","common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_policy_id":0,"common_service":5,"common_action":0}

3
src/test/testdata/oneSIPUniFlowRTP vendored Normal file
View File

@@ -0,0 +1,3 @@
{"common_action":0,"common_address_list":"50734-20730-192.168.36.136-192.168.56.27","common_address_type":4,"common_app_behavior":"voice_call","common_app_id":{"UNKNOWN":[{"app_id":4,"app_name":"unknown","packet_sequence":22,"surrogate_id":0}]},"common_app_identify_info":[{"app_name":"unknown","packet_sequence":22}],"common_app_label":"unknown","common_c2s_byte_num":64860,"common_c2s_pkt_num":304,"common_client_ip":"192.168.36.136","common_client_port":50734,"common_con_duration_ms":6264,"common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_direction":73,"common_end_time":1663123136,"common_l4_protocol":"IPv4_UDP","common_l7_protocol":"STUN.RTP","common_policy_id":0,"common_s2c_byte_num":66144,"common_s2c_pkt_num":310,"common_schema_type":"RTP","common_server_ip":"192.168.56.27","common_server_port":20730,"common_service":4,"common_sled_ip":"192.168.40.81","common_start_time":1663123136,"common_stream_dir":1,"common_stream_trace_id":"869477723735406984","rtp_pcap_path":"http://192.168.44.67:9098/hos/rtp_hos_bucket/192.168.56.27_192.168.36.136_20730_50734_123.pcap","raw_log_status":"CLOSE"}
{"common_action":0,"common_address_list":"50734-20730-192.168.36.136-192.168.56.27","common_address_type":4,"common_app_behavior":"voice_call","common_app_id":{"UNKNOWN":[{"app_id":4,"app_name":"unknown","packet_sequence":22,"surrogate_id":0}]},"common_app_identify_info":[{"app_name":"unknown","packet_sequence":22}],"common_app_label":"unknown","common_c2s_byte_num":64860,"common_c2s_pkt_num":304,"common_client_ip":"192.168.36.136","common_client_port":50734,"common_con_duration_ms":6264,"common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_direction":73,"common_end_time":1663123136,"common_l4_protocol":"IPv4_UDP","common_l7_protocol":"STUN.RTP","common_policy_id":0,"common_s2c_byte_num":66144,"common_s2c_pkt_num":310,"common_schema_type":"RTP","common_server_ip":"192.168.56.27","common_server_port":20730,"common_service":4,"common_sled_ip":"192.168.40.81","common_start_time":1663123136,"common_stream_dir":2,"common_stream_trace_id":"869477723735406984","rtp_pcap_path":"http://192.168.44.67:9098/hos/rtp_hos_bucket/192.168.56.27_192.168.36.136_20730_50734_456.pcap","raw_log_status":"CLOSE"}
{"common_schema_type":"SIP","common_sessions":1,"sip_call_id":"NDNjYmJkMDJhNGJmYmUyN2EwZjdmYzNjMWE1YTRmYjc.","sip_originator_description":"\"test1\"<sip:test1@192.168.40.158>","sip_responder_description":"\"test2\"<sip:test2@192.168.40.158>","sip_user_agent":"eyeBeamrelease1011dstamp40820","sip_originator_sdp_content":"v=0\r\no=-52INIP4192.168.56.27\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.56.27\r\nt=00\r\nm=audio20730RTP/AVP0818101\r\na=alt:11:UmXquXbwvg/CQxTA192.168.56.2720730\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:F934D61F09C84591BB3ECEECE5993BB8\r\n","sip_originator_sdp_connect_ip":"192.168.56.27","sip_originator_sdp_media_port":20730,"sip_originator_sdp_media_type":"18G729/8000","sip_server":"OpenSIPS(2.4.9(x86_64/linux))","sip_responder_sdp_content":"v=0\r\no=-62INIP4192.168.36.136\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.36.136\r\nt=00\r\nm=audio50734RTP/AVP0818101\r\na=alt:11:sXlLExDmMtTnircb192.168.36.13650734\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:4342C45E30624AA085D078ABE24D2D33\r\n","sip_responder_sdp_connect_ip":"192.168.36.136","sip_responder_sdp_media_port":50734,"sip_responder_sdp_media_type":"18G729/8000","sip_duration_s":7,"sip_bye":"responder","common_protocol_label":"ETHERNET.IPv4.UDP","common_c2s_ipfrag_num":0,"common_s2c_ipfrag_num":0,"common_direction":69,"common_l7_protocol":"SIP","common_server_ip":"192.168.40.158","common_client_ip":"192.168.56.27","common_server_port":5060,"common_client_port":32196,"common_stream_dir":3,"common_address_type":4,"common_address_list":"32196-5060-192.168.56.27-192.168.40.158","common_start_time":1663123136,"common_end_time":1663123136,"common_con_duration_ms":17043,"common_s2c_pkt_num":9,"common_s2c_byte_num":5375,"common_c2s_pkt_num":9,"common_c2s_byte_num":6563,"common_stream_trace_id":"869477723735079299","common_l4_protocol":"IPv4_UDP","common_sled_ip":"192.168.40.81","common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_policy_id":0,"common_service":5,"common_action":0}

3
src/test/testdata/oneSIPtwoRTP vendored Normal file
View File

@@ -0,0 +1,3 @@
{"common_schema_type":"SIP","common_sessions":1,"sip_call_id":"NDNjYmJkMDJhNGJmYmUyN2EwZjdmYzNjMWE1YTRmYjc.","sip_originator_description":"\"test1\"<sip:test1@192.168.40.158>","sip_responder_description":"\"test2\"<sip:test2@192.168.40.158>","sip_user_agent":"eyeBeamrelease1011dstamp40820","sip_originator_sdp_content":"v=0\r\no=-52INIP4192.168.56.27\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.56.27\r\nt=00\r\nm=audio20730RTP/AVP0818101\r\na=alt:11:UmXquXbwvg/CQxTA192.168.56.2720730\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:F934D61F09C84591BB3ECEECE5993BB8\r\n","sip_originator_sdp_connect_ip":"192.168.56.27","sip_originator_sdp_media_port":20730,"sip_originator_sdp_media_type":"18G729/8000","sip_server":"OpenSIPS(2.4.9(x86_64/linux))","sip_responder_sdp_content":"v=0\r\no=-62INIP4192.168.36.136\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.36.136\r\nt=00\r\nm=audio50733RTP/AVP0818101\r\na=alt:11:sXlLExDmMtTnircb192.168.36.13650733\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:4342C45E30624AA085D078ABE24D2D33\r\n","sip_responder_sdp_connect_ip":"192.168.36.136","sip_responder_sdp_media_port":50733,"sip_responder_sdp_media_type":"18G729/8000","sip_duration_s":7,"sip_bye":"responder","common_protocol_label":"ETHERNET.IPv4.UDP","common_c2s_ipfrag_num":0,"common_s2c_ipfrag_num":0,"common_direction":69,"common_l7_protocol":"SIP","common_server_ip":"192.168.40.158","common_client_ip":"192.168.56.27","common_server_port":5060,"common_client_port":32196,"common_stream_dir":3,"common_address_type":4,"common_address_list":"32196-5060-192.168.56.27-192.168.40.158","common_start_time":1663123136,"common_end_time":1663123136,"common_con_duration_ms":17043,"common_s2c_pkt_num":9,"common_s2c_byte_num":5375,"common_c2s_pkt_num":9,"common_c2s_byte_num":6563,"common_stream_trace_id":"869477723735079299","common_l4_protocol":"IPv4_UDP","common_sled_ip":"192.168.40.81","common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_policy_id":0,"common_service":5,"common_action":0}
{"common_action":0,"common_address_list":"50733-20730-192.168.36.136-192.168.56.27","common_address_type":4,"common_app_behavior":"voice_call","common_app_id":{"UNKNOWN":[{"app_id":4,"app_name":"unknown","packet_sequence":22,"surrogate_id":0}]},"common_app_identify_info":[{"app_name":"unknown","packet_sequence":22}],"common_app_label":"unknown","common_c2s_byte_num":64860,"common_c2s_pkt_num":304,"common_client_ip":"192.168.36.136","common_client_port":50733,"common_con_duration_ms":6264,"common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_direction":73,"common_end_time":1663123136,"common_l4_protocol":"IPv4_UDP","common_l7_protocol":"STUN.RTP","common_policy_id":0,"common_s2c_byte_num":66144,"common_s2c_pkt_num":310,"common_schema_type":"RTP","common_server_ip":"192.168.56.27","common_server_port":20730,"common_service":4,"common_sled_ip":"192.168.40.81","common_start_time":1663123136,"common_stream_dir":3,"common_stream_trace_id":"869477723735406984","rtp_pcap_path":"http://192.168.44.67:9098/hos/rtp_hos_bucket/192.168.56.27_192.168.36.136_20730_50733_123.pcap","raw_log_status":"CLOSE"}
{"common_action":0,"common_address_list":"50733-20730-192.168.36.136-192.168.56.27","common_address_type":4,"common_app_behavior":"voice_call","common_app_id":{"UNKNOWN":[{"app_id":4,"app_name":"unknown","packet_sequence":22,"surrogate_id":0}]},"common_app_identify_info":[{"app_name":"unknown","packet_sequence":22}],"common_app_label":"unknown","common_c2s_byte_num":64860,"common_c2s_pkt_num":304,"common_client_ip":"192.168.36.136","common_client_port":50733,"common_con_duration_ms":6264,"common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_direction":73,"common_end_time":1663123136,"common_l4_protocol":"IPv4_UDP","common_l7_protocol":"STUN.RTP","common_policy_id":0,"common_s2c_byte_num":66144,"common_s2c_pkt_num":310,"common_schema_type":"RTP","common_server_ip":"192.168.56.27","common_server_port":20730,"common_service":4,"common_sled_ip":"192.168.40.81","common_start_time":1663123136,"common_stream_dir":3,"common_stream_trace_id":"869477723735406984","rtp_pcap_path":"http://192.168.44.67:9098/hos/rtp_hos_bucket/192.168.56.27_192.168.36.136_20730_50733_456.pcap","raw_log_status":"CLOSE"}

4
src/test/testdata/twoRTPandSIP vendored Normal file
View File

@@ -0,0 +1,4 @@
{"common_action":0,"common_address_list":"50735-20730-192.168.36.136-192.168.56.27","common_address_type":4,"common_app_behavior":"voice_call","common_app_id":{"UNKNOWN":[{"app_id":4,"app_name":"unknown","packet_sequence":22,"surrogate_id":0}]},"common_app_identify_info":[{"app_name":"unknown","packet_sequence":22}],"common_app_label":"unknown","common_c2s_byte_num":64860,"common_c2s_pkt_num":304,"common_client_ip":"192.168.36.136","common_client_port":50735,"common_con_duration_ms":6264,"common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_direction":73,"common_end_time":1663123136,"common_l4_protocol":"IPv4_UDP","common_l7_protocol":"STUN.RTP","common_policy_id":0,"common_s2c_byte_num":66144,"common_s2c_pkt_num":310,"common_schema_type":"RTP","common_server_ip":"192.168.56.27","common_server_port":20730,"common_service":4,"common_sled_ip":"192.168.40.81","common_start_time":1663123136,"common_stream_dir":3,"common_stream_trace_id":"869477723735406984","rtp_pcap_path":"http://192.168.44.67:9098/hos/rtp_hos_bucket/192.168.56.27_192.168.36.136_20730_50735_123.pcap","raw_log_status":"CLOSE"}
{"common_action":0,"common_address_list":"50735-20730-192.168.36.136-192.168.56.27","common_address_type":4,"common_app_behavior":"voice_call","common_app_id":{"UNKNOWN":[{"app_id":4,"app_name":"unknown","packet_sequence":22,"surrogate_id":0}]},"common_app_identify_info":[{"app_name":"unknown","packet_sequence":22}],"common_app_label":"unknown","common_c2s_byte_num":64860,"common_c2s_pkt_num":304,"common_client_ip":"192.168.36.136","common_client_port":50735,"common_con_duration_ms":6264,"common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_direction":73,"common_end_time":1663123136,"common_l4_protocol":"IPv4_UDP","common_l7_protocol":"STUN.RTP","common_policy_id":0,"common_s2c_byte_num":66144,"common_s2c_pkt_num":310,"common_schema_type":"RTP","common_server_ip":"192.168.56.27","common_server_port":20730,"common_service":4,"common_sled_ip":"192.168.40.81","common_start_time":1663123136,"common_stream_dir":3,"common_stream_trace_id":"869477723735406984","rtp_pcap_path":"http://192.168.44.67:9098/hos/rtp_hos_bucket/192.168.56.27_192.168.36.136_20730_50735_123.pcap","raw_log_status":"CLOSE"}
{"common_schema_type":"SIP","common_sessions":1,"sip_call_id":"NDNjYmJkMDJhNGJmYmUyN2EwZjdmYzNjMWE1YTRmYjc.","sip_originator_description":"\"test1\"<sip:test1@192.168.40.158>","sip_responder_description":"\"test2\"<sip:test2@192.168.40.158>","sip_user_agent":"eyeBeamrelease1011dstamp40820","sip_originator_sdp_content":"v=0\r\no=-52INIP4192.168.56.27\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.56.27\r\nt=00\r\nm=audio20730RTP/AVP0818101\r\na=alt:11:UmXquXbwvg/CQxTA192.168.56.2720730\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:F934D61F09C84591BB3ECEECE5993BB8\r\n","sip_originator_sdp_connect_ip":"192.168.56.27","sip_originator_sdp_media_port":20730,"sip_originator_sdp_media_type":"18G729/8000","sip_server":"OpenSIPS(2.4.9(x86_64/linux))","sip_responder_sdp_content":"v=0\r\no=-62INIP4192.168.36.136\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.36.136\r\nt=00\r\nm=audio50735RTP/AVP0818101\r\na=alt:11:sXlLExDmMtTnircb192.168.36.13650735\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:4342C45E30624AA085D078ABE24D2D33\r\n","sip_responder_sdp_connect_ip":"192.168.36.136","sip_responder_sdp_media_port":50735,"sip_responder_sdp_media_type":"18G729/8000","sip_duration_s":7,"sip_bye":"responder","common_protocol_label":"ETHERNET.IPv4.UDP","common_c2s_ipfrag_num":0,"common_s2c_ipfrag_num":0,"common_direction":69,"common_l7_protocol":"SIP","common_server_ip":"192.168.40.158","common_client_ip":"192.168.56.27","common_server_port":5060,"common_client_port":32196,"common_stream_dir":3,"common_address_type":4,"common_address_list":"32196-5060-192.168.56.27-192.168.40.158","common_start_time":1663123136,"common_end_time":1663123136,"common_con_duration_ms":17043,"common_s2c_pkt_num":9,"common_s2c_byte_num":5375,"common_c2s_pkt_num":9,"common_c2s_byte_num":6563,"common_stream_trace_id":"869477723735079299","common_l4_protocol":"IPv4_UDP","common_sled_ip":"192.168.40.81","common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_policy_id":0,"common_service":5,"common_action":0}
{"common_schema_type":"SIP","common_sessions":1,"sip_call_id":"NDNjYmJkMDJhNGJmYmUyN2EwZjdmYzNjMWE1YTRmYjc.","sip_originator_description":"\"test1\"<sip:test1@192.168.40.158>","sip_responder_description":"\"test2\"<sip:test2@192.168.40.158>","sip_user_agent":"eyeBeamrelease1011dstamp40820","sip_originator_sdp_content":"v=0\r\no=-52INIP4192.168.56.27\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.56.27\r\nt=00\r\nm=audio20730RTP/AVP0818101\r\na=alt:11:UmXquXbwvg/CQxTA192.168.56.2720730\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:F934D61F09C84591BB3ECEECE5993BB8\r\n","sip_originator_sdp_connect_ip":"192.168.56.27","sip_originator_sdp_media_port":20730,"sip_originator_sdp_media_type":"18G729/8000","sip_server":"OpenSIPS(2.4.9(x86_64/linux))","sip_responder_sdp_content":"v=0\r\no=-62INIP4192.168.36.136\r\ns=CounterPatheyeBeam1.5\r\nc=INIP4192.168.36.136\r\nt=00\r\nm=audio50735RTP/AVP0818101\r\na=alt:11:sXlLExDmMtTnircb192.168.36.13650735\r\na=fmtp:18annexb=no\r\na=fmtp:1010-15\r\na=rtpmap:18G729/8000\r\na=rtpmap:101telephone-event/8000\r\na=sendrecv\r\na=x-rtp-session-id:4342C45E30624AA085D078ABE24D2D33\r\n","sip_responder_sdp_connect_ip":"192.168.36.136","sip_responder_sdp_media_port":50735,"sip_responder_sdp_media_type":"18G729/8000","sip_duration_s":7,"sip_bye":"responder","common_protocol_label":"ETHERNET.IPv4.UDP","common_c2s_ipfrag_num":0,"common_s2c_ipfrag_num":0,"common_direction":69,"common_l7_protocol":"SIP","common_server_ip":"192.168.40.158","common_client_ip":"192.168.56.27","common_server_port":5060,"common_client_port":32196,"common_stream_dir":3,"common_address_type":4,"common_address_list":"32196-5060-192.168.56.27-192.168.40.158","common_start_time":1663123136,"common_end_time":1663123136,"common_con_duration_ms":17043,"common_s2c_pkt_num":9,"common_s2c_byte_num":5375,"common_c2s_pkt_num":9,"common_c2s_byte_num":6563,"common_stream_trace_id":"869477723735079299","common_l4_protocol":"IPv4_UDP","common_sled_ip":"192.168.40.81","common_device_id":"21426003","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-three\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-three\"}]}","common_policy_id":0,"common_service":5,"common_action":0}