diff --git a/pom.xml b/pom.xml
index e063681..e5435ec 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
com.zdjizhi
log-stream-voip-relation
- 220418-Nacos
+ 220914-VSYS
log-stream-voip-relation
http://www.example.com
@@ -23,10 +23,10 @@
maven-ali
http://maven.aliyun.com/nexus/content/groups/public/
-
+ true
-
+ true
fail
@@ -207,23 +207,10 @@
2.4.0
-
cn.hutool
hutool-all
- 5.5.2
-
-
-
- org.slf4j
- slf4j-api
- 1.7.21
-
-
-
- org.slf4j
- slf4j-log4j12
- 1.7.21
+ 5.7.17
diff --git a/properties/default_config.properties b/properties/default_config.properties
index da4adca..c750148 100644
--- a/properties/default_config.properties
+++ b/properties/default_config.properties
@@ -48,8 +48,6 @@ nacos.pin=nacos
#nacos group
nacos.group=Galaxy
-#====================Topology Default====================#
-#check ip is Inner network;0 off, 1 on.
-check.inner.network=0
+
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
index f875d95..9cd2e94 100644
--- a/properties/service_flow_config.properties
+++ b/properties/service_flow_config.properties
@@ -15,7 +15,7 @@ tools.library=D:\\workerspace\\dat\\
nacos.server=192.168.44.12:8848
#nacos namespace
-nacos.schema.namespace=flink
+nacos.schema.namespace=test
#nacos data id
nacos.data.id=voip_record.json
@@ -32,13 +32,19 @@ group.id=mytest-1
#--------------------------------topology配置------------------------------#
#map函数并行度
-window.parallelism=1
+merge.uniflow.window.parallelism=1
-#voip日志对准窗口时间 seconds
-voip.calibration.window.time=60
+#map函数并行度
+calibration.window.parallelism=1
#单向流对准窗口时间 seconds
one.sided.window.time=10
+#voip日志对准窗口时间 seconds
+voip.calibration.window.time=30
+
#voip二次对准时间 seconds
-sec.combine.sr.cache.secs=120
\ No newline at end of file
+sec.combine.sr.cache.secs=60
+
+#check ip is Inner network;0 off, 1 on.
+check.inner.network=0
\ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/common/JsonProConfig.java b/src/main/java/com/zdjizhi/common/JsonProConfig.java
index 6e9d437..11de17c 100644
--- a/src/main/java/com/zdjizhi/common/JsonProConfig.java
+++ b/src/main/java/com/zdjizhi/common/JsonProConfig.java
@@ -1,8 +1,6 @@
package com.zdjizhi.common;
-import com.zdjizhi.utils.system.VoipRelationConfigurations;
-
/**
* @author Administrator
*/
@@ -13,123 +11,103 @@ public class JsonProConfig {
public static final int DOUBLE = 3;
/**
- *
+ * SIP日志标识
*/
public static final String SIP_MARK = "SIP";
/**
- *
+ * RTP日志标识
*/
public static final String RTP_MARK = "RTP";
+
/**
- *
+ * 所属vsys,缺省为1
+ */
+ public static final String VSYS_ID = "common_vsys_id";
+
+ /**
+ * 日志类型
*/
public static final String SCHEMA_TYPE = "common_schema_type";
/**
- *
+ * 会话结束时间
*/
public static final String END_TIME = "common_end_time";
/**
- *
+ * 流类型
+ * 1:c2s,2:s2c;3;double
*/
public static final String STREAM_DIR = "common_stream_dir";
+
/**
- *
- */
- public static final String SESSIONS = "common_sessions";
- /**
- *
- */
- public static final String C2S_PKT_NUM = "common_c2s_pkt_num";
- /**
- *
- */
- public static final String S2C_PKT_NUM = "common_s2c_pkt_num";
- /**
- *
- */
- public static final String C2S_BYTE_NUM = "common_c2s_byte_num";
- /**
- *
- */
- public static final String S2C_BYTE_NUM = "common_s2c_byte_num";
- /**
- *
- */
- public static final String C2S_IPFRAG_NUM = "common_c2s_ipfrag_num";
- /**
- *
- */
- public static final String S2C_IPFRAG_NUM = "common_s2c_ipfrag_num";
- /**
- *
- */
- public static final String C2S_TCP_LOSTLEN = "common_c2s_tcp_lostlen";
- /**
- *
- */
- public static final String S2C_TCP_LOSTLEN = "common_s2c_tcp_lostlen";
- /**
- *
- */
- public static final String C2S_TCP_UNORDER_NUM = "common_c2s_tcp_unorder_num";
- /**
- *
- */
- public static final String S2C_TCP_UNORDER_NUM = "common_s2c_tcp_unorder_num";
- /**
- *
+ * 客户端ip地址
*/
public static final String CLIENT_IP = "common_client_ip";
/**
- *
+ * 客户端端口
*/
public static final String CLIENT_PORT = "common_client_port";
/**
- *
+ * 服务端ip地址
*/
public static final String SERVER_IP = "common_server_ip";
/**
- *
+ * 服务端端口
*/
public static final String SERVER_PORT = "common_server_port";
/**
- *
+ * 会话ID
*/
public static final String SIP_CALL_ID = "sip_call_id";
/**
- *
+ * 协商的主叫语音传输IP
*/
public static final String SIP_ORIGINATOR_IP = "sip_originator_sdp_connect_ip";
/**
- *
+ * 协商的主叫语音传输端口
*/
public static final String SIP_ORIGINATOR_PORT = "sip_originator_sdp_media_port";
/**
- *
+ * 协商的被叫语音传输IP
*/
public static final String SIP_RESPONDER_IP = "sip_responder_sdp_connect_ip";
/**
- *
+ * 协商的被叫语音传输端口
*/
public static final String SIP_RESPONDER_PORT = "sip_responder_sdp_media_port";
/**
- *
+ * RTP原始包文件地址
*/
public static final String RTP_PCAP_PATH = "rtp_pcap_path";
/**
- *
+ * 主叫方向
*/
public static final String RTP_ORIGINATOR_DIR = "rtp_originator_dir";
/**
- *
+ * c2s编码方式序号(PT)
*/
public static final String RTP_PAYLOAD_TYPE_C2S = "rtp_payload_type_c2s";
/**
- *
+ * s2c编码方式序号(PT)
*/
public static final String RTP_PAYLOAD_TYPE_S2C = "rtp_payload_type_s2c";
+ /**
+ * 各类Transmission指标key
+ */
+ public static final String SESSIONS = "common_sessions";
+ public static final String C2S_PKT_NUM = "common_c2s_pkt_num";
+ public static final String S2C_PKT_NUM = "common_s2c_pkt_num";
+ public static final String C2S_BYTE_NUM = "common_c2s_byte_num";
+ public static final String S2C_BYTE_NUM = "common_s2c_byte_num";
+ public static final String C2S_IPFRAG_NUM = "common_c2s_ipfrag_num";
+ public static final String S2C_IPFRAG_NUM = "common_s2c_ipfrag_num";
+ public static final String C2S_TCP_LOSTLEN = "common_c2s_tcp_lostlen";
+ public static final String S2C_TCP_LOSTLEN = "common_s2c_tcp_lostlen";
+ public static final String C2S_TCP_UNORDER_NUM = "common_c2s_tcp_unorder_num";
+ public static final String S2C_TCP_UNORDER_NUM = "common_s2c_tcp_unorder_num";
+
+
}
\ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/common/VoipRelationConfig.java b/src/main/java/com/zdjizhi/common/VoipRelationConfig.java
index 61c50a1..758af0f 100644
--- a/src/main/java/com/zdjizhi/common/VoipRelationConfig.java
+++ b/src/main/java/com/zdjizhi/common/VoipRelationConfig.java
@@ -1,7 +1,7 @@
package com.zdjizhi.common;
-import com.zdjizhi.utils.system.VoipRelationConfigurations;
+import com.zdjizhi.tools.system.VoipRelationConfigurations;
import org.jasypt.encryption.pbe.StandardPBEStringEncryptor;
/**
@@ -39,7 +39,8 @@ public class VoipRelationConfig {
*/
public static final Integer VOIP_CALIBRATION_WINDOW_TIME = VoipRelationConfigurations.getIntProperty(0, "voip.calibration.window.time");
public static final Integer ONE_SIDED_WINDOW_TIME = VoipRelationConfigurations.getIntProperty(0, "one.sided.window.time");
- public static final Integer WINDOW_PARALLELISM = VoipRelationConfigurations.getIntProperty(0, "window.parallelism");
+ public static final Integer MERGE_UNIFLOW_WINDOW_PARALLELISM = VoipRelationConfigurations.getIntProperty(0, "merge.uniflow.window.parallelism");
+ public static final Integer CALIBRATION_WINDOW_PARALLELISM = VoipRelationConfigurations.getIntProperty(0, "calibration.window.parallelism");
/**
* connection kafka
@@ -72,15 +73,11 @@ public class VoipRelationConfig {
public static final Integer MAX_REQUEST_SIZE = VoipRelationConfigurations.getIntProperty(1, "max.request.size");
public static final String TOOLS_LIBRARY = VoipRelationConfigurations.getStringProperty(0, "tools.library");
- /**
- * http
- */
- public static final String SCHEMA_HTTP = VoipRelationConfigurations.getStringProperty(0, "schema.http");
/**
* voip
*/
public static final Integer SEC_COMBINE_SR_CACHE_SECS = VoipRelationConfigurations.getIntProperty(0, "sec.combine.sr.cache.secs");
- public static final Integer CHECK_INNER_NETWORK = VoipRelationConfigurations.getIntProperty(1, "check.inner.network");
+ public static final Integer CHECK_INNER_NETWORK = VoipRelationConfigurations.getIntProperty(0, "check.inner.network");
}
\ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/operator/filter/NullFilterFunction.java b/src/main/java/com/zdjizhi/operator/filter/NullFilterFunction.java
new file mode 100644
index 0000000..8be77f8
--- /dev/null
+++ b/src/main/java/com/zdjizhi/operator/filter/NullFilterFunction.java
@@ -0,0 +1,17 @@
+package com.zdjizhi.operator.filter;
+
+import com.zdjizhi.utils.StringUtil;
+import org.apache.flink.api.common.functions.FilterFunction;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.operator.filter
+ * @Description:
+ * @date 2022/9/214:59
+ */
+public class NullFilterFunction implements FilterFunction {
+ @Override
+ public boolean filter(String message) throws Exception {
+ return StringUtil.isNotBlank(message);
+ }
+}
diff --git a/src/main/java/com/zdjizhi/operator/group/MergeUniFlowKeyByFunction.java b/src/main/java/com/zdjizhi/operator/group/MergeUniFlowKeyByFunction.java
new file mode 100644
index 0000000..351bee7
--- /dev/null
+++ b/src/main/java/com/zdjizhi/operator/group/MergeUniFlowKeyByFunction.java
@@ -0,0 +1,22 @@
+package com.zdjizhi.operator.group;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple3;
+
+import java.util.Map;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.operator.group
+ * @Description:
+ * @date 2022/8/2911:08
+ */
+public class MergeUniFlowKeyByFunction implements KeySelector, Integer>, Integer> {
+ @Override
+ public Integer getKey(Tuple3, Integer> value) throws Exception {
+ //vsys id
+ return value.f2;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/operator/group/VoipCalibrationKeyByFunction.java b/src/main/java/com/zdjizhi/operator/group/VoipCalibrationKeyByFunction.java
new file mode 100644
index 0000000..a184566
--- /dev/null
+++ b/src/main/java/com/zdjizhi/operator/group/VoipCalibrationKeyByFunction.java
@@ -0,0 +1,27 @@
+package com.zdjizhi.operator.group;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.jayway.jsonpath.DocumentContext;
+import com.jayway.jsonpath.JsonPath;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+
+import java.util.Map;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.operator.group
+ * @Description:
+ * @date 2022/8/2911:08
+ */
+public class VoipCalibrationKeyByFunction implements KeySelector, Integer>, Tuple2> {
+ @Override
+ public Tuple2 getKey(Tuple4, Integer> value) throws Exception {
+ String fourKey = value.f0;
+ Integer vsysId = value.f3;
+
+ return new Tuple2<>(fourKey, vsysId);
+ }
+}
diff --git a/src/main/java/com/zdjizhi/operator/parse/ParseMapFunction.java b/src/main/java/com/zdjizhi/operator/parse/ParseMapFunction.java
new file mode 100644
index 0000000..3d7ca4b
--- /dev/null
+++ b/src/main/java/com/zdjizhi/operator/parse/ParseMapFunction.java
@@ -0,0 +1,82 @@
+package com.zdjizhi.operator.parse;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.common.JsonProConfig;
+import com.zdjizhi.tools.utils.RelationUtils;
+import com.zdjizhi.tools.json.JsonParseUtil;
+import com.zdjizhi.utils.JsonMapper;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple3;
+
+import java.util.Map;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.operator.parse
+ * @Description:
+ * @date 2022/9/916:21
+ */
+public class ParseMapFunction implements MapFunction, Integer>> {
+ private static final Log logger = LogFactory.get();
+
+ @Override
+ public Tuple3, Integer> map(String input) throws Exception {
+ try {
+ if (StringUtil.isNotBlank(input)) {
+ Map jsonMap = JsonParseUtil.typeTransform((Map) JsonMapper.fromJsonString(input, Map.class));
+
+ String commonSchemaType = JsonParseUtil.getString(jsonMap, JsonProConfig.SCHEMA_TYPE);
+ Integer vsysId = getVsysId(jsonMap);
+ String sipCallId = JsonParseUtil.getString(jsonMap, JsonProConfig.SIP_CALL_ID);
+
+ //1:c2s,2:s2c;3;double
+ int commonStreamDir = JsonParseUtil.getInteger(jsonMap, JsonProConfig.STREAM_DIR);
+
+ if (JsonProConfig.SIP_MARK.equals(commonSchemaType)) {
+ if (StringUtil.isNotBlank(sipCallId)) {
+ if (RelationUtils.checkSipCompleteness(jsonMap)) {
+ if (commonStreamDir != JsonProConfig.DOUBLE) {
+ return new Tuple3<>("sip-single", jsonMap, vsysId);
+ } else {
+ return new Tuple3<>("sip-double", jsonMap, vsysId);
+ }
+ } else {
+ return new Tuple3<>("violation", jsonMap, vsysId);
+ }
+ } else {
+ return new Tuple3<>("violation", jsonMap, vsysId);
+ }
+ } else if (JsonProConfig.RTP_MARK.equals(commonSchemaType)) {
+ if (commonStreamDir == JsonProConfig.DOUBLE) {
+ return new Tuple3<>("rtp-double", jsonMap, vsysId);
+ } else {
+ return new Tuple3<>("rtp-single", jsonMap, vsysId);
+ }
+ }
+ }
+ } catch (RuntimeException e) {
+ logger.error("TransForm logs failed,The exception is :" + e.getMessage());
+ }
+ return null;
+ }
+
+ /**
+ * 获取VSYS ID若无该字段,则默认值为1
+ *
+ * @param jsonMap 原始日志
+ * @return vsysid
+ */
+ private static int getVsysId(Map jsonMap) {
+ Object value = jsonMap.getOrDefault(JsonProConfig.VSYS_ID, null);
+ if (value != null) {
+ return Integer.parseInt(value.toString());
+ } else {
+ return 1;
+ }
+
+ }
+
+
+}
diff --git a/src/main/java/com/zdjizhi/utils/functions/OneSidedWindowFunction.java b/src/main/java/com/zdjizhi/operator/window/MergeUniFlowWindowFunction.java
similarity index 54%
rename from src/main/java/com/zdjizhi/utils/functions/OneSidedWindowFunction.java
rename to src/main/java/com/zdjizhi/operator/window/MergeUniFlowWindowFunction.java
index 0e244c3..cf25763 100644
--- a/src/main/java/com/zdjizhi/utils/functions/OneSidedWindowFunction.java
+++ b/src/main/java/com/zdjizhi/operator/window/MergeUniFlowWindowFunction.java
@@ -1,4 +1,4 @@
-package com.zdjizhi.utils.functions;
+package com.zdjizhi.operator.window;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
@@ -7,167 +7,162 @@ import com.zdjizhi.common.VoipRelationConfig;
import com.zdjizhi.utils.IPUtil;
import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.StringUtil;
-import com.zdjizhi.utils.json.JsonParseUtil;
+import com.zdjizhi.tools.utils.RelationUtils;
+import com.zdjizhi.tools.json.JsonParseUtil;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
+import scala.Int;
import java.util.HashMap;
import java.util.Map;
/**
* @author qidaijie
- * @Package com.zdjizhi.utils.functions
+ * @Package com.zdjizhi.operator
* @Description:
- * @date 2021/8/1818:04
+ * @date 2022/8/2911:44
*/
-public class OneSidedWindowFunction extends ProcessAllWindowFunction, TimeWindow> {
+public class MergeUniFlowWindowFunction extends ProcessWindowFunction, Integer>, Tuple4, Integer>, Integer, TimeWindow> {
private static final Log logger = LogFactory.get();
/**
* key-sip_call_id;value为sip的具体数据---存放的是SIP未关联的数据(单向流)
*/
- private static HashMap sipOriHmList = new HashMap<>(32);
+ private static HashMap> sipOriHmList = new HashMap<>(32);
/**
* key-rtp拼接的四元组;value为rtp的具体数据---存放的是RTP未关联的数据(单向流)
*/
- private static HashMap rtpOriHmList = new HashMap<>(32);
+ private static HashMap> rtpOriHmList = new HashMap<>(32);
+
@Override
- @SuppressWarnings("unchecked")
- public void process(Context context, Iterable inputs, Collector> out) {
- for (String input : inputs) {
- if (StringUtil.isNotBlank(input)) {
- try {
- Map object = JsonParseUtil.typeTransform((Map) JsonMapper.fromJsonString(input, Map.class));
+ public void process(Integer key, Context context, Iterable, Integer>> inputs, Collector, Integer>> out) throws Exception {
+ for (Tuple3, Integer> input : inputs) {
+ if (input != null) {
+ //已关联的sip,rtp;未关联的sip,rtp;内网的sip
+ String type = input.f0;
+ Map jsonMap = input.f1;
+ Integer vsysId = input.f2;
- String commonSchemaType = JsonParseUtil.getString(object, JsonProConfig.SCHEMA_TYPE);
- String sipCallId = JsonParseUtil.getString(object, JsonProConfig.SIP_CALL_ID);
+ logger.error("" + type + "----" + jsonMap.toString());
+ switch (type) {
+ case "sip-double":
+ separateInnerIp(jsonMap, out, vsysId);
+ break;
+ case "rtp-double":
+ String rtpDouble4Key = getFourKey(JsonParseUtil.getString(jsonMap, JsonProConfig.CLIENT_IP),
+ JsonParseUtil.getInteger(jsonMap, JsonProConfig.CLIENT_PORT),
+ JsonParseUtil.getString(jsonMap, JsonProConfig.SERVER_IP),
+ JsonParseUtil.getInteger(jsonMap, JsonProConfig.SERVER_PORT));
- //1:c2s,2:s2c;3;double
- int commonStreamDir = JsonParseUtil.getInteger(object, JsonProConfig.STREAM_DIR);
-
- /*
- * 针对SIP日志进行处理
- */
- if (JsonProConfig.SIP_MARK.equals(commonSchemaType) && StringUtil.isNotBlank(sipCallId)) {
- if (relationUtils.checkSipCompleteness(object)) {
- if (commonStreamDir != JsonProConfig.DOUBLE) {
- putKeyAndMsg(input, sipCallId, sipOriHmList, "SIP", out);
- } else {
- separateInnerIp(object, out);
- }
+ out.collect(new Tuple4<>(rtpDouble4Key, type, jsonMap, vsysId));
+ break;
+ case "violation":
+ out.collect(new Tuple4<>("", type, jsonMap, vsysId));
+ break;
+ //单向流的SIP
+ case "sip-single":
+ String sipCallId = JsonParseUtil.getString(jsonMap, JsonProConfig.SIP_CALL_ID);
+ String sipSingle4Key = getFourKey(JsonParseUtil.getString(jsonMap, JsonProConfig.CLIENT_IP),
+ JsonParseUtil.getInteger(jsonMap, JsonProConfig.CLIENT_PORT),
+ JsonParseUtil.getString(jsonMap, JsonProConfig.SERVER_IP),
+ JsonParseUtil.getInteger(jsonMap, JsonProConfig.SERVER_PORT));
+ if (StringUtil.isNotBlank(sipCallId)) {
+ putKeyAndMsg(jsonMap, sipCallId + sipSingle4Key, sipOriHmList, "SIP", vsysId, out);
} else {
- out.collect(new Tuple3<>("", "violation", input));
+ out.collect(new Tuple4<>("", type, jsonMap, vsysId));
}
- }
+ break;
+ //单向流的RTP
+ case "rtp-single":
+ String rtpSingle4Key = getFourKey(JsonParseUtil.getString(jsonMap, JsonProConfig.CLIENT_IP),
+ JsonParseUtil.getInteger(jsonMap, JsonProConfig.CLIENT_PORT),
+ JsonParseUtil.getString(jsonMap, JsonProConfig.SERVER_IP),
+ JsonParseUtil.getInteger(jsonMap, JsonProConfig.SERVER_PORT));
+ //对rtp单向流进行关联
+ putKeyAndMsg(jsonMap, rtpSingle4Key, rtpOriHmList, "RTP", vsysId, out);
+ break;
+ //内网的SIP
+ default:
+ logger.error("type is beyond expectation:" + type);
+ break;
- /*
- * 针对RTP日志进行处理
- */
- if (JsonProConfig.RTP_MARK.equals(commonSchemaType)) {
- String clientIP = JsonParseUtil.getString(object, JsonProConfig.CLIENT_IP);
- int clientPort = JsonParseUtil.getInteger(object, JsonProConfig.CLIENT_PORT);
- String ServerIP = JsonParseUtil.getString(object, JsonProConfig.SERVER_IP);
- int ServerPort = JsonParseUtil.getInteger(object, JsonProConfig.SERVER_PORT);
-
- String rtpIpPort4Key = getFourKey(clientIP, clientPort, ServerIP, ServerPort);
-
- if (commonStreamDir != JsonProConfig.DOUBLE) {
- //对rtp单向流进行关联
- putKeyAndMsg(input, rtpIpPort4Key, rtpOriHmList, "RTP", out);
-
- } else {
- //RTP双向流,按四元组下发
- out.collect(new Tuple3<>(rtpIpPort4Key, "rtp-two", input));
- }
- }
- } catch (RuntimeException e) {
- logger.error("parsing JSON or Unidirectional data flow fusion has exception! error is :" + e);
}
}
}
- /*
- * 定时发送SIP未关联上数据
- */
+
if (sipOriHmList.size() > 0) {
- HashMap tmpSipOriHmList = new HashMap<>(sipOriHmList);
+ HashMap> tmpSipOriHmList = new HashMap<>(sipOriHmList);
sipOriHmList.clear();
for (String sipKey : tmpSipOriHmList.keySet()) {
- String sipSingleMsg = tmpSipOriHmList.get(sipKey);
+ Map sipSingleMsg = tmpSipOriHmList.get(sipKey);
//sipKey为sip_call_id,未关联成功的sip是不能使用的
- out.collect(new Tuple3<>(sipKey, "sip-single", sipSingleMsg));
+ out.collect(new Tuple4<>(sipKey, "sip-single", sipSingleMsg, 0));
}
}
- /*
- * 定时发送RTP未关联上数据
- */
+
if (rtpOriHmList.size() > 0) {
- HashMap tmpRtpOriHmList = new HashMap<>(rtpOriHmList);
+ HashMap> tmpRtpOriHmList = new HashMap<>(rtpOriHmList);
rtpOriHmList.clear();
for (String rtpKey : tmpRtpOriHmList.keySet()) {
- String rtpSingleMsg = tmpRtpOriHmList.get(rtpKey);
+ Map rtpSingleMsg = tmpRtpOriHmList.get(rtpKey);
//未关联成功的rtp还可以继续关联,因为有四元组
- out.collect(new Tuple3<>(rtpKey, "rtp-single", rtpSingleMsg));
+ out.collect(new Tuple4<>(rtpKey, "rtp-single", rtpSingleMsg, 0));
}
}
+
}
/**
* 存放key并关联拼接对应Key
*/
@SuppressWarnings("unchecked")
- private static void putKeyAndMsg(String message, String hmStrKey, HashMap hashMapStr, String protocolType, Collector> out) {
+ private static void putKeyAndMsg(Map secondSipOrRtpLog, String hmStrKey, HashMap> hashMapStr, String protocolType, Integer vsysId, Collector, Integer>> out) {
//和上次存入的数据关联
if (hashMapStr.containsKey(hmStrKey)) {
- HashMap jsonCommonMap = new HashMap<>(32);
- String[] strArr = new String[2];
- String firstMsg = hashMapStr.remove(hmStrKey);
- Map firstSipOrRtpLog = (Map) JsonMapper.fromJsonString(firstMsg, Map.class);
- Map secondSipOrRtpLog = (Map) JsonMapper.fromJsonString(message, Map.class);
+ HashMap jsonCommonMap = new HashMap<>(128);
+ Map firstSipOrRtpLog = hashMapStr.remove(hmStrKey);
//1:c2s,2:s2c;3;double,1表示firstMsg为请求侧(c2s),合并时以它为准
if (JsonParseUtil.getInteger(firstSipOrRtpLog, JsonProConfig.STREAM_DIR) == 1) {
- strArr[0] = message;
- strArr[1] = firstMsg;
+ jsonCommonMap.putAll(secondSipOrRtpLog);
+ jsonCommonMap.putAll(firstSipOrRtpLog);
} else {
- strArr[0] = firstMsg;
- strArr[1] = message;
+ jsonCommonMap.putAll(firstSipOrRtpLog);
+ jsonCommonMap.putAll(secondSipOrRtpLog);
}
- jsonCommonMap.putAll((Map) JsonMapper.fromJsonString(strArr[0], Map.class));
- jsonCommonMap.putAll((Map) JsonMapper.fromJsonString(strArr[1], Map.class));
- String sipTwoMsg = jsonCommonMap.toString();
+ accumulateMsg(firstSipOrRtpLog, secondSipOrRtpLog, jsonCommonMap);
+ jsonCommonMap.put(JsonProConfig.STREAM_DIR, JsonProConfig.DOUBLE);
- Map sipOrRtpCombin = (Map) JsonMapper.fromJsonString(sipTwoMsg, Map.class);
- accumulateMsg(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin);
- sipOrRtpCombin.put(JsonProConfig.STREAM_DIR, JsonProConfig.DOUBLE);
if (JsonProConfig.SIP_MARK.equals(protocolType)) {
//手动关联SIP后区分内外网IP再下发
- separateInnerIp(sipOrRtpCombin, out);
+ separateInnerIp(jsonCommonMap, out, vsysId);
} else if (JsonProConfig.RTP_MARK.equals(protocolType)) {
//手动关联RTP后按四元组下发
- sipOrRtpCombin.put(JsonProConfig.RTP_PCAP_PATH, setRtpPacpPath(firstSipOrRtpLog, secondSipOrRtpLog));
- out.collect(new Tuple3<>(hmStrKey, "rtp-two", JsonMapper.toJsonString(sipOrRtpCombin)));
+ jsonCommonMap.put(JsonProConfig.RTP_PCAP_PATH, setRtpPacpPath(firstSipOrRtpLog, secondSipOrRtpLog));
+ out.collect(new Tuple4<>(hmStrKey, "rtp-double", jsonCommonMap, vsysId));
}
} else {
- hashMapStr.put(hmStrKey, message);
+ hashMapStr.put(hmStrKey, secondSipOrRtpLog);
}
}
/**
* 区分SIP的内外网IP,此时已经关联完成包含四元组,但未区分内外网IP
*/
- private static void separateInnerIp(Map object, Collector> out) {
+ private static void separateInnerIp(Map jsonMap, Collector, Integer>> out, Integer vsysid) {
- String sipOriginatorIp = JsonParseUtil.getString(object, JsonProConfig.SIP_ORIGINATOR_IP);
- int sipOriginatorPort = JsonParseUtil.getInteger(object, JsonProConfig.SIP_ORIGINATOR_PORT);
- String sipResponderIp = JsonParseUtil.getString(object, JsonProConfig.SIP_RESPONDER_IP);
- int sipResponderPort = JsonParseUtil.getInteger(object, JsonProConfig.SIP_RESPONDER_PORT);
+ String sipOriginatorIp = JsonParseUtil.getString(jsonMap, JsonProConfig.SIP_ORIGINATOR_IP);
+ int sipOriginatorPort = JsonParseUtil.getInteger(jsonMap, JsonProConfig.SIP_ORIGINATOR_PORT);
+ String sipResponderIp = JsonParseUtil.getString(jsonMap, JsonProConfig.SIP_RESPONDER_IP);
+ int sipResponderPort = JsonParseUtil.getInteger(jsonMap, JsonProConfig.SIP_RESPONDER_PORT);
- if (relationUtils.isInnerIp(sipOriginatorIp) || relationUtils.isInnerIp(sipResponderIp)) {
+ if (RelationUtils.isInnerIp(sipOriginatorIp) || RelationUtils.isInnerIp(sipResponderIp)) {
/*
* 按from-ip_from-port_to-ip_to-port
*/
@@ -176,7 +171,7 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction(sipInnerEmitKey, "sip-in", JsonMapper.toJsonString(object)));
+ out.collect(new Tuple4<>(sipInnerEmitKey, "sip-in", jsonMap, vsysid));
} else {
String sipIpPort4Key = getFourKey(sipOriginatorIp,
sipOriginatorPort,
@@ -184,7 +179,7 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction(sipIpPort4Key, "sip-two", JsonMapper.toJsonString(object)));
+ out.collect(new Tuple4<>(sipIpPort4Key, "sip-double", jsonMap, vsysid));
}
}
@@ -199,7 +194,7 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction serverIpNum
case 1:
@@ -284,57 +279,27 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction firstSipOrRtpLog, Map secondSipOrRtpLog, Map sipOrRtpCombin) {
//common_sessions
- relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.SESSIONS);
+ RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.SESSIONS);
//common_c2s_pkt_num
- relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_PKT_NUM);
+ RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_PKT_NUM);
//common_s2c_pkt_num
- relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_PKT_NUM);
+ RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_PKT_NUM);
//common_c2s_byte_num
- relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_BYTE_NUM);
+ RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_BYTE_NUM);
//common_s2c_byte_num
- relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_BYTE_NUM);
+ RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_BYTE_NUM);
//common_c2s_ipfrag_num
- relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_IPFRAG_NUM);
+ RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_IPFRAG_NUM);
//common_s2c_ipfrag_num
- relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_IPFRAG_NUM);
+ RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_IPFRAG_NUM);
//common_c2s_tcp_lostlen
- relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_TCP_LOSTLEN);
+ RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_TCP_LOSTLEN);
//common_s2c_tcp_lostlen
- relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_TCP_LOSTLEN);
+ RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_TCP_LOSTLEN);
//common_c2s_tcp_unorder_num
- relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_TCP_UNORDER_NUM);
+ RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.C2S_TCP_UNORDER_NUM);
//common_s2c_tcp_unorder_num
- relationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_TCP_UNORDER_NUM);
- }
-
- /**
- * int类型
- * 比较数字大小,左边>右边-返回:1,左边<右边-返回:-1,左边=右边-返回:0
- *
- * @param numOne 数值1
- * @param numTwo 数值2
- */
- private static int compareNum(int numOne, int numTwo) {
- if (numOne > 0 && numTwo > 0) {
- return Integer.compare(numOne, numTwo);
- } else {
- return -2;
- }
- }
-
- /**
- * long类型
- * 比较数字大小,左边>右边-返回:1,左边<右边-返回:-1,左边=右边-返回:0
- *
- * @param numOne 数值1
- * @param numTwo 数值2
- */
- private static int compareNum(long numOne, long numTwo) {
- if (numOne > 0 && numTwo > 0) {
- return Long.compare(numOne, numTwo);
- } else {
- return -2;
- }
+ RelationUtils.metricSumSetOtherLog(firstSipOrRtpLog, secondSipOrRtpLog, sipOrRtpCombin, JsonProConfig.S2C_TCP_UNORDER_NUM);
}
/**
diff --git a/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindowFunction.java b/src/main/java/com/zdjizhi/operator/window/VoipCalibrationWindowFunction.java
similarity index 52%
rename from src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindowFunction.java
rename to src/main/java/com/zdjizhi/operator/window/VoipCalibrationWindowFunction.java
index 7180419..9929212 100644
--- a/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindowFunction.java
+++ b/src/main/java/com/zdjizhi/operator/window/VoipCalibrationWindowFunction.java
@@ -1,14 +1,16 @@
-package com.zdjizhi.utils.functions;
+package com.zdjizhi.operator.window;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.JsonProConfig;
import com.zdjizhi.common.VoipRelationConfig;
+import com.zdjizhi.tools.utils.RelationUtils;
+import com.zdjizhi.tools.json.JsonParseUtil;
import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.StringUtil;
-import com.zdjizhi.utils.json.JsonParseUtil;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
@@ -16,11 +18,11 @@ import java.util.*;
/**
* @author qidaijie
- * @Package com.zdjizhi.utils.functions
+ * @Package com.zdjizhi.operator.window
* @Description:
- * @date 2021/7/2113:55
+ * @date 2022/9/617:46
*/
-public class SipCalibrationWindowFunction extends ProcessAllWindowFunction, String, TimeWindow> {
+public class VoipCalibrationWindowFunction extends ProcessWindowFunction, Integer>, String, Tuple2, TimeWindow> {
private static final Log logger = LogFactory.get();
/**
@@ -30,7 +32,7 @@ public class SipCalibrationWindowFunction extends ProcessAllWindowFunction> combineSRHmList = new HashMap<>(16);
+ private static HashMap>> combineSRHmList = new HashMap<>(32);
/**
* 二次关联用HashMap
@@ -39,31 +41,33 @@ public class SipCalibrationWindowFunction extends ProcessAllWindowFunction> secCombineSRHmList = new HashMap<>(16);
+ private static HashMap>> secCombineSRHmList = new HashMap<>(32);
@Override
- public void process(Context context, Iterable> input, Collector output) throws Exception {
- for (Tuple3 tuple : input) {
+ public void process(Tuple2 key, Context context, Iterable, Integer>> input, Collector out) throws Exception {
+ for (Tuple4, Integer> tuple : input) {
//拼接的四元组
String fourKey = tuple.f0;
//已关联的sip,rtp;未关联的sip,rtp;内网的sip
String type = tuple.f1;
- String msg = tuple.f2;
+ Map jsonMap = tuple.f2;
+ logger.error("" + fourKey + "----" + type);
switch (type) {
//单向流对准后的SIP
- case "sip-two":
+ case "sip-double":
//单向流对准后的RTP
- case "rtp-two":
- //对不上的RTP
- case "rtp-single":
- putKeyAndMsg(msg, fourKey, combineSRHmList);
+ case "rtp-double":
+ putKeyAndMsg(jsonMap, fourKey, combineSRHmList);
break;
//单向流的SIP
case "sip-single":
+ //单向流的RTP
+ case "rtp-single":
//内网的SIP
case "sip-in":
+ //违规的日志
case "violation":
- output.collect(msg);
+ out.collect(JsonMapper.toJsonString(jsonMap));
break;
default:
logger.error("type is beyond expectation:" + type);
@@ -72,82 +76,91 @@ public class SipCalibrationWindowFunction extends ProcessAllWindowFunction message, String fourStrKey, HashMap>> combineSRHmList) {
+ if (combineSRHmList.containsKey(fourStrKey)) {
+ LinkedList