diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..1491d5e
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,241 @@
+
+
+
+ 4.0.0
+
+ com.zdjizhi
+ log-stream-voip-relation
+ 210908-security
+
+ log-stream-voip-relation
+ http://www.example.com
+
+
+
+
+ nexus
+ Team Nexus Repository
+ http://192.168.40.125:8099/content/groups/public
+
+
+
+ maven-ali
+ http://maven.aliyun.com/nexus/content/groups/public/
+
+
+
+
+
+ fail
+
+
+
+
+
+ UTF-8
+ 1.13.1
+ 2.7.1
+ 1.0.0
+ 2.2.3
+ provided
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 2.4.2
+
+
+ package
+
+ shade
+
+
+
+
+ com.zdjizhi.topology.VoIpRelationTopology
+
+
+
+
+
+
+
+
+ io.github.zlika
+ reproducible-build-maven-plugin
+ 0.2
+
+
+
+ strip-jar
+
+ package
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 2.3.2
+
+ 1.8
+ 1.8
+
+
+
+
+
+ properties
+
+ **/*.properties
+ **/*.xml
+
+ false
+
+
+
+ src\main\java
+
+ log4j.properties
+
+ false
+
+
+
+
+
+
+
+ com.alibaba
+ fastjson
+ 1.2.70
+
+
+
+ com.zdjizhi
+ galaxy
+ 1.0.6
+
+
+ slf4j-log4j12
+ org.slf4j
+
+
+ log4j-over-slf4j
+ org.slf4j
+
+
+
+
+
+
+ org.apache.flink
+ flink-table
+ ${flink.version}
+ pom
+ ${scope.type}
+
+
+
+
+ org.apache.flink
+ flink-core
+ ${flink.version}
+ ${scope.type}
+
+
+
+
+
+ org.apache.flink
+ flink-streaming-java_2.11
+ ${flink.version}
+
+
+
+
+
+ org.apache.flink
+ flink-clients_2.11
+ ${flink.version}
+ ${scope.type}
+
+
+
+
+ org.apache.flink
+ flink-connector-kafka_2.11
+ ${flink.version}
+
+
+
+
+
+ org.apache.flink
+ flink-java
+ ${flink.version}
+ ${scope.type}
+
+
+
+ cglib
+ cglib-nodep
+ 3.2.4
+
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ 5.3.2
+ compile
+
+
+
+ org.apache.httpcomponents
+ httpclient
+ 4.5.2
+
+
+
+ com.jayway.jsonpath
+ json-path
+ 2.4.0
+
+
+
+
+ cn.hutool
+ hutool-all
+ 5.5.2
+
+
+
+ org.slf4j
+ slf4j-api
+ 1.7.21
+
+
+
+ org.slf4j
+ slf4j-log4j12
+ 1.7.21
+
+
+
+ junit
+ junit
+ 4.12
+ test
+
+
+
+
+
diff --git a/properties/default_config.properties b/properties/default_config.properties
new file mode 100644
index 0000000..2b9bfb1
--- /dev/null
+++ b/properties/default_config.properties
@@ -0,0 +1,29 @@
+#producer重试的次数设置
+retries=0
+
+#他的含义就是说一个Batch被创建之后,最多过多久,不管这个Batch有没有写满,都必须发送出去了
+linger.ms=5
+
+#如果在超时之前未收到响应,客户端将在必要时重新发送请求
+request.timeout.ms=30000
+
+#producer都是按照batch进行发送的,批次大小,默认:16384
+batch.size=262144
+
+#Producer端用于缓存消息的缓冲区大小
+buffer.memory=67108864
+
+#这个参数决定了每次发送给Kafka服务器请求的最大大小,默认1048576
+max.request.size=5242880
+
+#kafka SASL验证用户名
+kafka.user=admin
+
+#kafka SASL及SSL验证密码
+kafka.pin=galaxy2019
+
+#kafka source protocol; SSL or SASL
+kafka.source.protocol=SASL
+
+#kafka sink protocol; SSL or SASL
+kafka.sink.protocol=SASL
\ No newline at end of file
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
new file mode 100644
index 0000000..939b6d8
--- /dev/null
+++ b/properties/service_flow_config.properties
@@ -0,0 +1,45 @@
+#--------------------------------鍦板潃閰嶇疆------------------------------#
+
+#绠$悊kafka鍦板潃
+input.kafka.servers=192.168.44.12:9092
+
+#绠$悊杈撳嚭kafka鍦板潃
+output.kafka.servers=192.168.44.12:9092
+
+#--------------------------------HTTP------------------------------#
+#瀹氫綅搴撳湴鍧
+tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\
+
+#缃戝叧鐨剆chema浣嶇疆
+schema.http=http://192.168.44.67:9999/metadata/schema/v1/fields/voip_record_log
+
+#--------------------------------Kafka娑堣垂缁勪俊鎭------------------------------#
+
+#kafka 鎺ユ敹鏁版嵁topic
+input.kafka.topic=VOIP-RECORD
+
+#琛ュ叏鏁版嵁 杈撳嚭 topic
+output.kafka.topic=VOIP-CONVERSATION-RECORD
+
+#璇诲彇topic,瀛樺偍璇pout id鐨勬秷璐筼ffset淇℃伅锛屽彲閫氳繃璇ユ嫇鎵戝懡鍚;鍏蜂綋瀛樺偍offset鐨勪綅缃紝纭畾涓嬫璇诲彇涓嶉噸澶嶇殑鏁版嵁锛
+group.id=voip-relation-log-20210625-1
+
+#鐢熶骇鑰呭帇缂╂ā寮 none or snappy
+producer.kafka.compression.type=none
+
+#鐢熶骇鑰卆ck
+producer.ack=1
+
+#--------------------------------topology閰嶇疆------------------------------#
+
+#map鍑芥暟骞惰搴
+window.parallelism=1
+
+#voip鏃ュ織瀵瑰噯绐楀彛鏃堕棿 seconds
+voip.calibration.window.time=15
+
+#鍗曞悜娴佸鍑嗙獥鍙f椂闂 seconds
+one.sided.window.time=60
+
+#voip浜屾瀵瑰噯鏃堕棿 seconds
+sec.combine.sr.cache.secs=300
\ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/common/JsonProConfig.java b/src/main/java/com/zdjizhi/common/JsonProConfig.java
new file mode 100644
index 0000000..6e9d437
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/JsonProConfig.java
@@ -0,0 +1,135 @@
+package com.zdjizhi.common;
+
+
+import com.zdjizhi.utils.system.VoipRelationConfigurations;
+
+/**
+ * @author Administrator
+ */
+public class JsonProConfig {
+ /**
+ * 鍙屽悜娴佹爣璇
+ */
+ public static final int DOUBLE = 3;
+
+ /**
+ *
+ */
+ public static final String SIP_MARK = "SIP";
+
+ /**
+ *
+ */
+ public static final String RTP_MARK = "RTP";
+ /**
+ *
+ */
+ public static final String SCHEMA_TYPE = "common_schema_type";
+ /**
+ *
+ */
+ public static final String END_TIME = "common_end_time";
+ /**
+ *
+ */
+ public static final String STREAM_DIR = "common_stream_dir";
+ /**
+ *
+ */
+ public static final String SESSIONS = "common_sessions";
+ /**
+ *
+ */
+ public static final String C2S_PKT_NUM = "common_c2s_pkt_num";
+ /**
+ *
+ */
+ public static final String S2C_PKT_NUM = "common_s2c_pkt_num";
+ /**
+ *
+ */
+ public static final String C2S_BYTE_NUM = "common_c2s_byte_num";
+ /**
+ *
+ */
+ public static final String S2C_BYTE_NUM = "common_s2c_byte_num";
+ /**
+ *
+ */
+ public static final String C2S_IPFRAG_NUM = "common_c2s_ipfrag_num";
+ /**
+ *
+ */
+ public static final String S2C_IPFRAG_NUM = "common_s2c_ipfrag_num";
+ /**
+ *
+ */
+ public static final String C2S_TCP_LOSTLEN = "common_c2s_tcp_lostlen";
+ /**
+ *
+ */
+ public static final String S2C_TCP_LOSTLEN = "common_s2c_tcp_lostlen";
+ /**
+ *
+ */
+ public static final String C2S_TCP_UNORDER_NUM = "common_c2s_tcp_unorder_num";
+ /**
+ *
+ */
+ public static final String S2C_TCP_UNORDER_NUM = "common_s2c_tcp_unorder_num";
+ /**
+ *
+ */
+ public static final String CLIENT_IP = "common_client_ip";
+ /**
+ *
+ */
+ public static final String CLIENT_PORT = "common_client_port";
+ /**
+ *
+ */
+ public static final String SERVER_IP = "common_server_ip";
+ /**
+ *
+ */
+ public static final String SERVER_PORT = "common_server_port";
+
+ /**
+ *
+ */
+ public static final String SIP_CALL_ID = "sip_call_id";
+ /**
+ *
+ */
+ public static final String SIP_ORIGINATOR_IP = "sip_originator_sdp_connect_ip";
+ /**
+ *
+ */
+ public static final String SIP_ORIGINATOR_PORT = "sip_originator_sdp_media_port";
+ /**
+ *
+ */
+ public static final String SIP_RESPONDER_IP = "sip_responder_sdp_connect_ip";
+ /**
+ *
+ */
+ public static final String SIP_RESPONDER_PORT = "sip_responder_sdp_media_port";
+ /**
+ *
+ */
+ public static final String RTP_PCAP_PATH = "rtp_pcap_path";
+ /**
+ *
+ */
+ public static final String RTP_ORIGINATOR_DIR = "rtp_originator_dir";
+ /**
+ *
+ */
+ public static final String RTP_PAYLOAD_TYPE_C2S = "rtp_payload_type_c2s";
+ /**
+ *
+ */
+ public static final String RTP_PAYLOAD_TYPE_S2C = "rtp_payload_type_s2c";
+
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/common/VoipRelationConfig.java b/src/main/java/com/zdjizhi/common/VoipRelationConfig.java
new file mode 100644
index 0000000..abd41c7
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/VoipRelationConfig.java
@@ -0,0 +1,62 @@
+package com.zdjizhi.common;
+
+
+import com.zdjizhi.utils.system.VoipRelationConfigurations;
+
+/**
+ * @author Administrator
+ */
+public class VoipRelationConfig {
+
+ /**
+ * 鍥涘厓缁勭殑鎷兼帴杩炴帴瀛楃
+ */
+ public static final String CORRELATION_STR = "_";
+
+ public static final String VISIBILITY = "disabled";
+ public static final String FORMAT_SPLITTER = ",";
+
+ /**
+ * System
+ */
+ public static final Integer VOIP_CALIBRATION_WINDOW_TIME = VoipRelationConfigurations.getIntProperty(0, "voip.calibration.window.time");
+ public static final Integer ONE_SIDED_WINDOW_TIME = VoipRelationConfigurations.getIntProperty(0, "one.sided.window.time");
+ public static final Integer WINDOW_PARALLELISM = VoipRelationConfigurations.getIntProperty(0, "window.parallelism");
+
+ /**
+ * connection kafka
+ */
+ public static final String INPUT_KAFKA_SERVERS = VoipRelationConfigurations.getStringProperty(0, "input.kafka.servers");
+ public static final String OUTPUT_KAFKA_SERVERS = VoipRelationConfigurations.getStringProperty(0, "output.kafka.servers");
+ public static final String INPUT_KAFKA_TOPIC = VoipRelationConfigurations.getStringProperty(0, "input.kafka.topic");
+ public static final String OUTPUT_KAFKA_TOPIC = VoipRelationConfigurations.getStringProperty(0, "output.kafka.topic");
+ public static final String GROUP_ID = VoipRelationConfigurations.getStringProperty(0, "group.id");
+ public static final String PRODUCER_ACK = VoipRelationConfigurations.getStringProperty(0, "producer.ack");
+ public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = VoipRelationConfigurations.getStringProperty(0, "producer.kafka.compression.type");
+ public static final String KAFKA_USER = VoipRelationConfigurations.getStringProperty(1, "kafka.user");
+ public static final String KAFKA_PIN = VoipRelationConfigurations.getStringProperty(1, "kafka.pin");
+
+ /**
+ * kafka sink
+ */
+ public static final String RETRIES = VoipRelationConfigurations.getStringProperty(1, "retries");
+ public static final String LINGER_MS = VoipRelationConfigurations.getStringProperty(1, "linger.ms");
+ public static final Integer REQUEST_TIMEOUT_MS = VoipRelationConfigurations.getIntProperty(1, "request.timeout.ms");
+ public static final Integer BATCH_SIZE = VoipRelationConfigurations.getIntProperty(1, "batch.size");
+ public static final Integer BUFFER_MEMORY = VoipRelationConfigurations.getIntProperty(1, "buffer.memory");
+ public static final Integer MAX_REQUEST_SIZE = VoipRelationConfigurations.getIntProperty(1, "max.request.size");
+ public static final String KAFKA_SOURCE_PROTOCOL = VoipRelationConfigurations.getStringProperty(1, "kafka.source.protocol");
+ public static final String KAFKA_SINK_PROTOCOL = VoipRelationConfigurations.getStringProperty(1, "kafka.sink.protocol");
+ public static final String TOOLS_LIBRARY = VoipRelationConfigurations.getStringProperty(0, "tools.library");
+
+ /**
+ * http
+ */
+ public static final String SCHEMA_HTTP = VoipRelationConfigurations.getStringProperty(0, "schema.http");
+
+ /**
+ * voip
+ */
+ public static final Integer SEC_COMBINE_SR_CACHE_SECS = VoipRelationConfigurations.getIntProperty(0, "sec.combine.sr.cache.secs");
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/topology/VoIpRelationTopology.java b/src/main/java/com/zdjizhi/topology/VoIpRelationTopology.java
new file mode 100644
index 0000000..37b2f18
--- /dev/null
+++ b/src/main/java/com/zdjizhi/topology/VoIpRelationTopology.java
@@ -0,0 +1,64 @@
+package com.zdjizhi.topology;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.common.VoipRelationConfig;
+import com.zdjizhi.utils.functions.*;
+import com.zdjizhi.utils.kafka.Consumer;
+import com.zdjizhi.utils.kafka.Producer;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.datastream.WindowedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.topology
+ * @Description:
+ * @date 2021/5/2016:42
+ */
+public class VoIpRelationTopology {
+ private static final Log logger = LogFactory.get();
+
+ public static void main(String[] args) {
+ final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
+
+// environment.enableCheckpointing(5000);
+
+ DataStream streamSource = environment.addSource(Consumer.getKafkaConsumer());
+
+ SingleOutputStreamOperator> sipCorrelation =
+ streamSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(VoipRelationConfig.ONE_SIDED_WINDOW_TIME)))
+ .process(new OneSidedWindowFunction()).name("OneSidedWindow");
+
+ SingleOutputStreamOperator window = sipCorrelation.windowAll(
+ TumblingProcessingTimeWindows.of(Time.seconds(VoipRelationConfig.VOIP_CALIBRATION_WINDOW_TIME)))
+ .process(new SipCalibrationWindow()).name("SipCalibrationWindow");
+
+ window.addSink(Producer.getKafkaProducer()).name("VoIpLogSinkKafka");
+
+// KeyedStream, String> keyedStream = sipCorrelation.keyBy(new KeyByFunction());
+//
+// WindowedStream, String, TimeWindow> window =
+// keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(VoipRelationConfig.VOIP_CALIBRATION_WINDOW_TIME)));
+//
+// SingleOutputStreamOperator output = window.process(new SipCalibrationWindowFunction())
+// .name("SipCalibrationWindow").setParallelism(VoipRelationConfig.WINDOW_PARALLELISM);
+
+// output.addSink(Producer.getKafkaProducer()).name("VoIpLogSinkKafka");
+
+ try {
+ environment.execute("VOIP-RELATION");
+ } catch (Exception e) {
+ logger.error("This Flink task start ERROR! Exception information is :" + e);
+ }
+
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/utils/exception/VoipRelationException.java b/src/main/java/com/zdjizhi/utils/exception/VoipRelationException.java
new file mode 100644
index 0000000..b2ef9e9
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/exception/VoipRelationException.java
@@ -0,0 +1,18 @@
+package com.zdjizhi.utils.exception;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.storm.utils.execption
+ * @Description:
+ * @date 2021/3/259:42
+ */
+public class VoipRelationException extends RuntimeException {
+
+ public VoipRelationException() {
+ }
+
+ public VoipRelationException(String message) {
+ super(message);
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/utils/functions/KeyByFunction.java b/src/main/java/com/zdjizhi/utils/functions/KeyByFunction.java
new file mode 100644
index 0000000..0b00b3c
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/functions/KeyByFunction.java
@@ -0,0 +1,19 @@
+package com.zdjizhi.utils.functions;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple3;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.functions
+ * @Description:
+ * @date 2021/7/2112:13
+ */
+public class KeyByFunction implements KeySelector, String> {
+
+ @Override
+ public String getKey(Tuple3 value) throws Exception {
+ //浠ap鎷兼帴鐨刱ey鍒嗙粍
+ return value.f1;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/functions/OneSidedWindowFunction.java b/src/main/java/com/zdjizhi/utils/functions/OneSidedWindowFunction.java
new file mode 100644
index 0000000..38e6bdd
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/functions/OneSidedWindowFunction.java
@@ -0,0 +1,367 @@
+package com.zdjizhi.utils.functions;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson.JSONObject;
+import com.zdjizhi.common.JsonProConfig;
+import com.zdjizhi.common.VoipRelationConfig;
+import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.utils.ip.IPUtils;
+import com.zdjizhi.utils.json.JsonParseUtil;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.functions
+ * @Description:
+ * @date 2021/8/1818:04
+ */
+public class OneSidedWindowFunction extends ProcessAllWindowFunction, TimeWindow> {
+ private static final Log logger = LogFactory.get();
+ /**
+ * key-sip_call_id;value涓簊ip鐨勫叿浣撴暟鎹---瀛樻斁鐨勬槸SIP鏈叧鑱旂殑鏁版嵁(鍗曞悜娴)
+ */
+ private static HashMap sipOriHmList = new HashMap<>(16);
+
+ /**
+ * key-rtp鎷兼帴鐨勫洓鍏冪粍;value涓簉tp鐨勫叿浣撴暟鎹---瀛樻斁鐨勬槸RTP鏈叧鑱旂殑鏁版嵁(鍗曞悜娴)
+ */
+ private static HashMap rtpOriHmList = new HashMap<>(16);
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void process(Context context, Iterable inputs, Collector> out) throws Exception {
+
+ for (String input : inputs) {
+ if (StringUtil.isNotBlank(input)) {
+ JSONObject object = JSONObject.parseObject(input);
+
+ String commonSchemaType = object.getString(JsonProConfig.SCHEMA_TYPE);
+ String sipCallId = object.getString(JsonProConfig.SIP_CALL_ID);
+
+ //1锛歝2s锛2锛歴2c锛3锛沝ouble
+ int commonStreamDir = object.getInteger(JsonProConfig.STREAM_DIR);
+
+ /*
+ * 閽堝SIP鏃ュ織杩涜澶勭悊
+ */
+ if (JsonProConfig.SIP_MARK.equals(commonSchemaType) && StringUtil.isNotBlank(sipCallId)) {
+ if (commonStreamDir != JsonProConfig.DOUBLE) {
+ putKeyAndMsg(input, sipCallId, sipOriHmList, "SIP", out);
+ } else {
+ separateInnerIp(object, out);
+ }
+ }
+
+ /*
+ * 閽堝RTP鏃ュ織杩涜澶勭悊
+ */
+ if (JsonProConfig.RTP_MARK.equals(commonSchemaType)) {
+
+ String rtpIpPort4Key = getFourKey(object.getString(JsonProConfig.CLIENT_IP),
+ object.getInteger(JsonProConfig.CLIENT_PORT),
+ object.getString(JsonProConfig.SERVER_IP),
+ object.getInteger(JsonProConfig.SERVER_PORT));
+
+ if (commonStreamDir != JsonProConfig.DOUBLE) {
+ //瀵箁tp鍗曞悜娴佽繘琛屽叧鑱
+ putKeyAndMsg(input, rtpIpPort4Key, rtpOriHmList, "RTP", out);
+
+ } else {
+ //RTP鍙屽悜娴,鎸夊洓鍏冪粍涓嬪彂
+ out.collect(new Tuple3<>(rtpIpPort4Key, "rtp-two", input));
+ }
+ }
+ }
+ }
+ /*
+ * 瀹氭椂鍙戦丼IP鎴朢TP鏈叧鑱斾笂鏁版嵁
+ */
+ if (sipOriHmList.size() > 0) {
+ HashMap tmpSipOriHmList = new HashMap(sipOriHmList);
+ sipOriHmList.clear();
+ for (String sipKey : tmpSipOriHmList.keySet()) {
+ String sipSingleMsg = tmpSipOriHmList.get(sipKey);
+ //sipKey涓簊ip_call_id,鏈叧鑱旀垚鍔熺殑sip鏄笉鑳戒娇鐢ㄧ殑
+ out.collect(new Tuple3<>(sipKey, "sip-single", sipSingleMsg));
+ }
+ }
+
+ if (rtpOriHmList.size() > 0) {
+ HashMap tmpRtpOriHmList = new HashMap(rtpOriHmList);
+ rtpOriHmList.clear();
+ for (String rtpKey : tmpRtpOriHmList.keySet()) {
+ String rtpSingleMsg = tmpRtpOriHmList.get(rtpKey);
+ //鏈叧鑱旀垚鍔熺殑rtp杩樺彲浠ョ户缁叧鑱,鍥犱负鏈夊洓鍏冪粍
+ out.collect(new Tuple3<>(rtpKey, "rtp-single", rtpSingleMsg));
+ }
+ }
+ }
+
+ /**
+ * 瀛樻斁key骞跺叧鑱旀嫾鎺ュ搴擪ey
+ */
+ private static void putKeyAndMsg(String message, String hmStrKey, HashMap hashMapStr, String protocolType, Collector> out) {
+
+ //鍜屼笂娆″瓨鍏ョ殑鏁版嵁鍏宠仈
+ if (hashMapStr.containsKey(hmStrKey)) {
+
+ JSONObject jsonCombinObject = new JSONObject();
+ String[] strArr = new String[2];
+ String firstMsg = hashMapStr.remove(hmStrKey);
+
+ JSONObject firstSipOrRtpLog = JSONObject.parseObject(firstMsg);
+ JSONObject secendSipOrRtpLog = JSONObject.parseObject(message);
+
+ //1锛歝2s锛2锛歴2c锛3锛沝ouble,1琛ㄧずfirstMsg涓鸿姹備晶(c2s),鍚堝苟鏃朵互瀹冧负鍑
+ if (firstSipOrRtpLog.getInteger(JsonProConfig.STREAM_DIR) == 1) {
+ strArr[0] = message;
+ strArr[1] = firstMsg;
+ } else {
+ strArr[0] = firstMsg;
+ strArr[1] = message;
+ }
+ jsonCombinObject.putAll(JSONObject.parseObject(strArr[0]));
+ jsonCombinObject.putAll(JSONObject.parseObject(strArr[1]));
+ String sipTwoMsg = jsonCombinObject.toString();
+
+
+ JSONObject sipOrRtpCombin = JSONObject.parseObject(sipTwoMsg);
+ accumulateMsg(firstSipOrRtpLog, secendSipOrRtpLog, sipOrRtpCombin);
+ sipOrRtpCombin.put(JsonProConfig.STREAM_DIR, JsonProConfig.DOUBLE);
+ if (JsonProConfig.SIP_MARK.equals(protocolType)) {
+ //鎵嬪姩鍏宠仈SIP鍚庡尯鍒嗗唴澶栫綉IP鍐嶄笅鍙
+ separateInnerIp(sipOrRtpCombin, out);
+ } else if (JsonProConfig.RTP_MARK.equals(protocolType)) {
+ //鎵嬪姩鍏宠仈RTP鍚庢寜鍥涘厓缁勪笅鍙
+ sipOrRtpCombin.put(JsonProConfig.RTP_PCAP_PATH, setRtpPath(firstSipOrRtpLog, secendSipOrRtpLog));
+ out.collect(new Tuple3<>(hmStrKey, "rtp-two", JSONObject.toJSONString(sipOrRtpCombin)));
+ }
+ } else {
+ hashMapStr.put(hmStrKey, message);
+ }
+ }
+
+ /**
+ * 鍖哄垎SIP鐨勫唴澶栫綉IP,姝ゆ椂宸茬粡鍏宠仈瀹屾垚鍖呭惈鍥涘厓缁,浣嗘湭鍖哄垎鍐呭缃慖P
+ */
+ private static void separateInnerIp(JSONObject object, Collector> out) {
+
+ String sipOriginatorIp = object.getString(JsonProConfig.SIP_ORIGINATOR_IP);
+ String sipResponderIp = object.getString(JsonProConfig.SIP_RESPONDER_IP);
+ int sipOriginatorPort = object.getInteger(JsonProConfig.SIP_ORIGINATOR_PORT);
+ int sipResponderPort = object.getInteger(JsonProConfig.SIP_RESPONDER_PORT);
+
+ if (IPUtils.isInnerIp(sipOriginatorIp)
+ || IPUtils.isInnerIp(sipResponderIp)) {
+ /**
+ * 鎸塮rom-ip_from-port_to-ip_to-port
+ */
+ String sipInnerEmitKey = sipOriginatorIp + VoipRelationConfig.CORRELATION_STR
+ + sipOriginatorPort + VoipRelationConfig.CORRELATION_STR
+ + sipResponderIp + VoipRelationConfig.CORRELATION_STR
+ + sipResponderPort;
+ //鍖呭惈鍐呯綉IP鐨凷IP鍏宠仈鍚庢暟鎹
+ out.collect(new Tuple3<>(sipInnerEmitKey, "sip-in", JSONObject.toJSONString(object)));
+ } else {
+ String sipIpPort4Key = getFourKey(sipOriginatorIp,
+ sipOriginatorPort,
+ sipResponderIp,
+ sipResponderPort);
+
+ //鎸夌収鍥涘厓缁勭殑Key鍙戦佸埌涓嬩竴涓猙olt
+ out.collect(new Tuple3<>(sipIpPort4Key, "sip-two", JSONObject.toJSONString(object)));
+ }
+ }
+
+ /**
+ * 鑾峰緱鍥涘厓缁刱ey
+ *
+ * @param commonClientIp 瀹㈡埛绔疘P
+ * @param commonClientPort 瀹㈡埛绔鍙
+ * @param commonServerIp 鏈嶅姟绔疘P
+ * @param commonServerPort 鏈嶅姟绔鍙
+ * @return 姣旇緝鎷兼帴鍚庣殑鍥涘厓缁
+ */
+ private static String getFourKey(String commonClientIp, int commonClientPort, String commonServerIp, int commonServerPort) {
+ String ipPort4Key = "";
+ int comparePortResult = compareNum(commonClientPort, commonServerPort);
+
+ /*
+ * 鎸夌鍙f瘮杈
+ */
+ switch (comparePortResult) {
+ //common_client_port > commonServerPort
+ case 1:
+ ipPort4Key = commonServerIp + VoipRelationConfig.CORRELATION_STR
+ + commonServerPort + VoipRelationConfig.CORRELATION_STR
+ + commonClientIp + VoipRelationConfig.CORRELATION_STR
+ + commonClientPort;
+ break;
+ //common_client_port < commonServerPort
+ case -1:
+ ipPort4Key = commonClientIp + VoipRelationConfig.CORRELATION_STR
+ + commonClientPort + VoipRelationConfig.CORRELATION_STR
+ + commonServerIp + VoipRelationConfig.CORRELATION_STR
+ + commonServerPort;
+ break;
+ //common_client_port = commonServerPort,寮濮嬫寜鐓P姣旇緝
+ case 0:
+ ipPort4Key = compareIp(commonClientIp, commonServerIp, commonClientPort, commonServerPort);
+ break;
+ //port绔彛鍊煎紓甯
+ case -2:
+ default:
+ logger.error("compareNum is error," +
+ "common_client_port:" + commonClientPort + "," +
+ "commonServerPort:" + commonServerPort);
+ break;
+ }
+
+ return ipPort4Key;
+ }
+
+ /**
+ * 姣旇緝IP,骞朵綔key鐨勬嫾鎺
+ *
+ * @param commonClientIp
+ * @param commonServerIp
+ * @param commonClientPort
+ * @param commonServerPort
+ * @return
+ */
+ private static String compareIp(String commonClientIp, String commonServerIp, int commonClientPort, int commonServerPort) {
+ long clientIpNum = IPUtils.ipToLong(commonClientIp);
+ long serverIpNum = IPUtils.ipToLong(commonServerIp);
+ int compareIpResult = compareNum(clientIpNum, serverIpNum);
+ switch (compareIpResult) {
+ //clientIpNum > serverIpNum
+ case 1:
+ return commonServerIp + VoipRelationConfig.CORRELATION_STR
+ + commonServerPort + VoipRelationConfig.CORRELATION_STR
+ + commonClientIp + VoipRelationConfig.CORRELATION_STR
+ + commonClientPort;
+ //clientIpNum < serverIpNum
+ case -1:
+ return commonClientIp + VoipRelationConfig.CORRELATION_STR
+ + commonClientPort + VoipRelationConfig.CORRELATION_STR
+ + commonServerIp + VoipRelationConfig.CORRELATION_STR
+ + commonServerPort;
+ //clientIpNum = serverIpNum,璇存槑涓や釜IP鍊间竴鏍凤紝鍗矷P寮傚父
+ case 0:
+ //IP鍊煎紓甯
+ case -2:
+ default:
+ logger.error("compareNum is error," +
+ "common_client_ip:" + commonClientIp + "," +
+ "commonServerIp:" + commonServerIp + "," +
+ "commonClientPort:" + commonClientPort + "," +
+ "commonServerPort:" + commonServerPort);
+ return "";
+ }
+ }
+
+ /**
+ * 璁$畻鐩稿叧瀛楄妭淇℃伅,涓昏鏄疮鍔
+ *
+ * @param firstSipOrRtpLog
+ * @param secendSipOrRtpLog
+ * @param sipOrRtpCombin
+ */
+ private static void accumulateMsg(JSONObject firstSipOrRtpLog, JSONObject secendSipOrRtpLog, JSONObject sipOrRtpCombin) {
+ //common_sessions
+ sipOrRtpCombin.put(JsonProConfig.SESSIONS, (firstSipOrRtpLog.getLongValue(JsonProConfig.SESSIONS) + secendSipOrRtpLog.getLongValue(JsonProConfig.SESSIONS)));
+
+ //common_c2s_pkt_num
+ sipOrRtpCombin.put(JsonProConfig.C2S_PKT_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.C2S_PKT_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.C2S_PKT_NUM)));
+
+ //common_s2c_pkt_num
+ sipOrRtpCombin.put(JsonProConfig.S2C_PKT_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.S2C_PKT_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.S2C_PKT_NUM)));
+
+ //common_c2s_byte_num
+ sipOrRtpCombin.put(JsonProConfig.C2S_BYTE_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.C2S_BYTE_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.C2S_BYTE_NUM)));
+
+ //common_s2c_byte_num
+ sipOrRtpCombin.put(JsonProConfig.S2C_BYTE_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.S2C_BYTE_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.S2C_BYTE_NUM)));
+
+ //common_c2s_ipfrag_num
+ sipOrRtpCombin.put(JsonProConfig.C2S_IPFRAG_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.C2S_IPFRAG_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.C2S_IPFRAG_NUM)));
+
+ //common_s2c_ipfrag_num
+ sipOrRtpCombin.put(JsonProConfig.S2C_IPFRAG_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.S2C_IPFRAG_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.S2C_IPFRAG_NUM)));
+
+ //common_c2s_tcp_lostlen
+ sipOrRtpCombin.put(JsonProConfig.C2S_TCP_LOSTLEN, (firstSipOrRtpLog.getLongValue(JsonProConfig.C2S_TCP_LOSTLEN) + secendSipOrRtpLog.getLongValue(JsonProConfig.C2S_TCP_LOSTLEN)));
+
+ //common_s2c_tcp_lostlen
+ sipOrRtpCombin.put(JsonProConfig.S2C_TCP_LOSTLEN, (firstSipOrRtpLog.getLongValue(JsonProConfig.S2C_TCP_LOSTLEN) + secendSipOrRtpLog.getLongValue(JsonProConfig.S2C_TCP_LOSTLEN)));
+
+ //common_c2s_tcp_unorder_num
+ sipOrRtpCombin.put(JsonProConfig.C2S_TCP_UNORDER_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.C2S_TCP_UNORDER_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.C2S_TCP_UNORDER_NUM)));
+
+ //common_s2c_tcp_unorder_num
+ sipOrRtpCombin.put(JsonProConfig.S2C_TCP_UNORDER_NUM, (firstSipOrRtpLog.getLongValue(JsonProConfig.S2C_TCP_UNORDER_NUM) + secendSipOrRtpLog.getLongValue(JsonProConfig.S2C_TCP_UNORDER_NUM)));
+ }
+
+ /**
+ * int绫诲瀷
+ * 姣旇緝鏁板瓧澶у皬,宸﹁竟>鍙宠竟-杩斿洖:1,宸﹁竟<鍙宠竟-杩斿洖:-1,宸﹁竟=鍙宠竟-杩斿洖:0
+ *
+ * @param numOne
+ * @param numTwo
+ */
+ private static int compareNum(int numOne, int numTwo) {
+ if (numOne > 0 && numTwo > 0) {
+ return Integer.compare(numOne, numTwo);
+ } else {
+ return -2;
+ }
+ }
+
+ /**
+ * long绫诲瀷
+ * 姣旇緝鏁板瓧澶у皬,宸﹁竟>鍙宠竟-杩斿洖:1,宸﹁竟<鍙宠竟-杩斿洖:-1,宸﹁竟=鍙宠竟-杩斿洖:0
+ *
+ * @param numOne
+ * @param numTwo
+ */
+ private static int compareNum(long numOne, long numTwo) {
+ if (numOne > 0 && numTwo > 0) {
+ return Long.compare(numOne, numTwo);
+ } else {
+ return -2;
+ }
+ }
+
+ /**
+ * 鍒ゆ柇RTP鍗曞悜娴佸鍑嗗悗鏄惁瀛樺湪澶氫釜鏂囦欢锛岃嫢鐩稿悓鍒欒繑鍥炰换鎰忎竴涓紝鑻ヤ笉鍚屽垯鎷兼帴杩斿洖
+ *
+ * @param firstSipOrRtpLog 绗竴涓崟鍚戞祦鏃ュ織
+ * @param secendSipOrRtpLog 绗簩涓崟鍚戞祦鏃ュ織
+ * @return 鏂囦欢璺緞
+ */
+ private static String setRtpPath(Map firstSipOrRtpLog, Map secendSipOrRtpLog) {
+
+ String firstRtpPcapPath = JsonParseUtil.getString(firstSipOrRtpLog, JsonProConfig.RTP_PCAP_PATH);
+ String secendRtpPcapPath = JsonParseUtil.getString(secendSipOrRtpLog, JsonProConfig.RTP_PCAP_PATH);
+
+ if (StringUtil.isNotBlank(firstRtpPcapPath) && StringUtil.isNotBlank(secendRtpPcapPath)) {
+ if (firstRtpPcapPath.equals(secendRtpPcapPath)) {
+ return firstRtpPcapPath;
+ } else {
+ return firstRtpPcapPath + ";" + secendRtpPcapPath;
+ }
+ } else if (StringUtil.isNotBlank(firstRtpPcapPath)) {
+ return firstRtpPcapPath;
+ } else {
+ return secendRtpPcapPath;
+ }
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindow.java b/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindow.java
new file mode 100644
index 0000000..29b55a1
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindow.java
@@ -0,0 +1,327 @@
+package com.zdjizhi.utils.functions;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson.JSONObject;
+import com.zdjizhi.common.JsonProConfig;
+import com.zdjizhi.common.VoipRelationConfig;
+import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.utils.json.JsonParseUtil;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.functions
+ * @Description:
+ * @date 2021/7/2113:55
+ */
+public class SipCalibrationWindow extends ProcessAllWindowFunction, String, TimeWindow> {
+ private static final Log logger = LogFactory.get();
+
+ /**
+ * 瀹炰綋绫诲弽灏刴ap
+ */
+ private static HashMap classMap = JsonParseUtil.getMapFromHttp(VoipRelationConfig.SCHEMA_HTTP);
+
+ /**
+ * 鍙嶅皠鎴愪竴涓被
+ */
+ private static Object voipObject = JsonParseUtil.generateObject(classMap);
+
+ /**
+ * 鍏宠仈鐢℉ashMap
+ * key---鍥涘厓缁
+ * value---List瀛樻斁瀵瑰簲SIP鎴栬匯TP鏁版嵁
+ * 瀛樻斁鏁版嵁:rtp-single,rtp-two,sip-two
+ * 涓嶅瓨鏀剧殑鏁版嵁:sip-single涓巗ip-in
+ */
+ private static HashMap> combineSRHmList = new HashMap<>(16);
+
+ /**
+ * 浜屾鍏宠仈鐢℉ashMap
+ * key---鍥涘厓缁
+ * value---List瀛樻斁瀵瑰簲SIP鎴栬匯TP鏁版嵁
+ * 瀛樻斁鏁版嵁:rtp-single,rtp-two,sip-two
+ * 涓嶅瓨鏀剧殑鏁版嵁:sip-single涓巗ip-in
+ */
+ private static HashMap> secCombineSRHmList = new HashMap<>(16);
+
+ @Override
+ public void process(Context context, Iterable> input, Collector output) throws Exception {
+ for (Tuple3 tuple : input) {
+ //鎷兼帴鐨勫洓鍏冪粍
+ String fourKey = tuple.f0;
+ //宸插叧鑱旂殑sip,rtp;鏈叧鑱旂殑sip,rtp;鍐呯綉鐨剆ip
+ String type = tuple.f1;
+ String msg = tuple.f2;
+ switch (type) {
+ //鍗曞悜娴佸鍑嗗悗鐨凷IP
+ case "sip-two":
+ //鍗曞悜娴佸鍑嗗悗鐨凴TP
+ case "rtp-two":
+ //瀵逛笉涓婄殑RTP
+ case "rtp-single":
+ putKeyAndMsg(msg, fourKey, combineSRHmList);
+ break;
+ //鍗曞悜娴佺殑SIP
+ case "sip-single":
+ //鍐呯綉鐨凷IP
+ case "sip-in":
+ output.collect(msg);
+ break;
+ default:
+ logger.error("type is beyond expectation:" + type);
+ break;
+
+ }
+ }
+ //鍒濇鍏宠仈
+ tickCombineHmList(combineSRHmList, output);
+ //鍜岀紦瀛樹腑鐨勬暟鎹簩娆″叧鑱
+ tickCombineHmList(secCombineSRHmList, output);
+ }
+
+
+ /**
+ * 瀹氭椂鍏宠仈,鍖呮嫭鍒濇鍏宠仈浠ュ強鍚庣画浜屾鍏宠仈
+ *
+ * @param combineHmList
+ */
+ private void tickCombineHmList(HashMap> combineHmList, Collector output) {
+ if (combineHmList.size() > 0) {
+ long nowTime = System.currentTimeMillis() / 1000;
+
+ HashMap> tempCombineSRhmList = new HashMap<>(combineHmList);
+ combineHmList.clear();
+
+ for (String fourStrKey : tempCombineSRhmList.keySet()) {
+
+ LinkedList tempList = tempCombineSRhmList.get(fourStrKey);
+ //鍖呭惈SIP鍜孯TP
+ int listSize = tempList.size();
+ System.out.println(listSize);
+ if (listSize > 1) {
+
+ List sipBeanArr = new ArrayList<>();
+ List rtpBeanArr = new ArrayList<>();
+
+ for (String message : tempList) {
+ Object tempSipOrRtpLog = JSONObject.parseObject(message, voipObject.getClass());
+ String schemaType = JsonParseUtil.getString(tempSipOrRtpLog, JsonProConfig.SCHEMA_TYPE);
+ if (JsonProConfig.SIP_MARK.equals(schemaType)) {
+ sipBeanArr.add(message);
+ } else if (JsonProConfig.RTP_MARK.equals(schemaType)) {
+ rtpBeanArr.add(message);
+ }
+ }
+ int rtpSize = rtpBeanArr.size();
+ int sipSize = sipBeanArr.size();
+
+ if (rtpSize == 1 && sipSize >= 1) {
+ for (String sipMessage : sipBeanArr) {
+ Object rtpLog = JSONObject.parseObject(rtpBeanArr.get(0), voipObject.getClass());
+ Object voipLog = JSONObject.parseObject(sipMessage, voipObject.getClass());
+ accumulateVoipMsg(voipLog, rtpLog);
+ JsonParseUtil.setValue(voipLog, JsonProConfig.SCHEMA_TYPE, "VoIP");
+ JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_ORIGINATOR_DIR, judgeDirection(rtpLog, voipLog));
+ //鍥涘厓缁,voip,鍏宠仈鍚庣殑鏁版嵁
+ output.collect(mergeJson(voipLog, rtpLog));
+// basicOutputCollector.emit(new Values(fourStrKey, "voip", mergeJson(voipLog, rtpLog)));
+ }
+ } else if (sipSize == 1 && rtpSize >= 1) {
+ for (String rtpMessage : rtpBeanArr) {
+ Object rtpLog = JSONObject.parseObject(rtpMessage, voipObject.getClass());
+ Object voipLog = JSONObject.parseObject(sipBeanArr.get(0), voipObject.getClass());
+ accumulateVoipMsg(voipLog, rtpLog);
+ JsonParseUtil.setValue(voipLog, JsonProConfig.SCHEMA_TYPE, "VoIP");
+ JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_ORIGINATOR_DIR, judgeDirection(rtpLog, voipLog));
+ //鍥涘厓缁,voip,鍏宠仈鍚庣殑鏁版嵁
+ output.collect(mergeJson(voipLog, rtpLog));
+// basicOutputCollector.emit(new Values(fourStrKey, "voip", mergeJson(voipLog, rtpLog)));
+ }
+ } else {
+ logger.warn("RTP-listSize is [" + rtpBeanArr.size() + "];SIP-listSize is [" + sipBeanArr.size() + "] in this tempCombineSRHmList! Not logical");
+ sendErrorLogToKafka(sipBeanArr, output);
+ sendErrorLogToKafka(rtpBeanArr, output);
+ }
+
+ } else {
+ String msg = tempList.get(0);
+
+ Object voipLog = JSONObject.parseObject(msg, voipObject.getClass());
+ long commonEndTime = JsonParseUtil.getLong(voipLog, JsonProConfig.END_TIME);
+ long intervalTime = nowTime - commonEndTime;
+ if (intervalTime <= VoipRelationConfig.SEC_COMBINE_SR_CACHE_SECS) {
+ putKeyAndMsg(msg, fourStrKey, secCombineSRHmList);
+ } else {
+ sendDirectlyOneElement(msg, voipLog, output);
+ }
+
+ }
+ }
+ }
+ }
+
+ /**
+ * 绱姞鍏宠仈鍚庣殑瀛楄妭绫诲弬鏁板
+ *
+ * @param voipLog
+ * @param rtpLog
+ */
+ private void accumulateVoipMsg(Object voipLog, Object rtpLog) {
+
+ Long sumCommonSessions = JsonParseUtil.getLong(voipLog, JsonProConfig.SESSIONS)
+ + JsonParseUtil.getLong(rtpLog, JsonProConfig.SESSIONS);
+
+ Long sumCommonC2sPktNum = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_PKT_NUM)
+ + JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_PKT_NUM);
+
+ Long sumCommonS2cPktNum = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_PKT_NUM)
+ + JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_PKT_NUM);
+
+ Long sumCommonC2sByteNum = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_BYTE_NUM)
+ + JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_BYTE_NUM);
+
+ Long sumCommonS2cByteNum = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_BYTE_NUM)
+ + JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_BYTE_NUM);
+
+ Long sumCommonC2sIpfragNum = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_IPFRAG_NUM)
+ + JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_IPFRAG_NUM);
+
+ Long sumCommonS2cIpfragNum = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_IPFRAG_NUM)
+ + JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_IPFRAG_NUM);
+
+ Long sumCommonC2sTcpLostlen = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_TCP_LOSTLEN)
+ + JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_TCP_LOSTLEN);
+
+ Long sumCommonS2cTcpLostlen = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_TCP_LOSTLEN)
+ + JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_TCP_LOSTLEN);
+
+ Long sumCommonC2sTcpUnorderNum = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_TCP_UNORDER_NUM)
+ + JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_TCP_UNORDER_NUM);
+
+ Long sumCommonS2cTcpUnorderNum = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_TCP_UNORDER_NUM)
+ + JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_TCP_UNORDER_NUM);
+
+ //common_sessions
+ JsonParseUtil.setValue(voipLog, JsonProConfig.SESSIONS, sumCommonSessions);
+
+ //common_c2s_pkt_num
+ JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_PKT_NUM, sumCommonC2sPktNum);
+ //common_s2c_pkt_num
+ JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_PKT_NUM, sumCommonS2cPktNum);
+ //common_c2s_byte_num
+ JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_BYTE_NUM, sumCommonC2sByteNum);
+ //common_s2c_byte_num
+ JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_BYTE_NUM, sumCommonS2cByteNum);
+
+ //common_c2s_ipfrag_num
+ JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_IPFRAG_NUM, sumCommonC2sIpfragNum);
+ //common_s2c_ipfrag_num
+ JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_IPFRAG_NUM, sumCommonS2cIpfragNum);
+
+ //common_c2s_tcp_lostlen
+ JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_TCP_LOSTLEN, sumCommonC2sTcpLostlen);
+ //common_s2c_tcp_lostlen
+ JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_TCP_LOSTLEN, sumCommonS2cTcpLostlen);
+
+ //common_c2s_tcp_unorder_num
+ JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_TCP_UNORDER_NUM, sumCommonC2sTcpUnorderNum);
+ //common_s2c_tcp_unorder_num
+ JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_TCP_UNORDER_NUM, sumCommonS2cTcpUnorderNum);
+
+ }
+
+ /**
+ * 瀹氭椂澶勭悊涓璍ist鍏冪礌鏁颁粎涓1鐨勬儏鍐
+ */
+ private void sendDirectlyOneElement(String msg, Object voipLog, Collector output) {
+ //鍥涘厓缁,sip(涓瀹氫负鍙屼晶)/rtp(鍙兘涓哄崟渚т篃鍙兘涓哄弻渚,鐪嬪崟鍚戞祦瀛楁淇℃伅),鎷垮嚭鏉ョ殑鍘熷鏁版嵁
+ String commonSchemaType = JsonParseUtil.getString(voipLog, JsonProConfig.SCHEMA_TYPE);
+ if (JsonProConfig.SIP_MARK.equals(commonSchemaType)) {
+ output.collect(msg);
+ } else if (JsonProConfig.RTP_MARK.equals(commonSchemaType)) {
+ int commonStreamDir = JsonParseUtil.getInteger(voipLog, JsonProConfig.STREAM_DIR);
+ if (commonStreamDir != JsonProConfig.DOUBLE) {
+ output.collect(msg);
+ } else {
+ output.collect(msg);
+ }
+ }
+ }
+
+
+ /**
+ * 瀛樻斁key骞舵坊鍔犲搴擫ist
+ */
+ private static void putKeyAndMsg(String message, String fourStrKey, HashMap> combineSRHmList) {
+ if (combineSRHmList.containsKey(fourStrKey)) {
+ LinkedList tmpList = combineSRHmList.get(fourStrKey);
+ tmpList.add(message);
+ combineSRHmList.put(fourStrKey, tmpList);
+ } else {
+ LinkedList tmpList = new LinkedList<>();
+ tmpList.add(message);
+ combineSRHmList.put(fourStrKey, tmpList);
+ }
+ }
+
+ /**
+ * 鍒ゆ柇RTP涓诲彨鏂瑰悜-娴嬭瘯
+ *
+ * @param rtpLog RTP鍘熷鏃ュ織
+ * @param voipLog 铻嶅悎鍚嶸OIP鏃ュ織
+ * @return 鏂瑰悜 0锛氭湭鐭 1锛歝2s 2锛歴2c
+ */
+ private static int judgeDirection(Object rtpLog, Object voipLog) {
+
+ String ip = JsonParseUtil.getString(rtpLog, JsonProConfig.CLIENT_IP);
+ String sipOriginatorIp = JsonParseUtil.getString(voipLog, JsonProConfig.SIP_ORIGINATOR_IP);
+ String sipResponderIp = JsonParseUtil.getString(voipLog, JsonProConfig.SIP_RESPONDER_IP);
+
+ if (StringUtil.isNotBlank(ip) && ip.equals(sipOriginatorIp)) {
+ return 1;
+ } else if (StringUtil.isNotBlank(ip) && ip.equals(sipResponderIp)) {
+ return 2;
+ }
+
+ return 0;
+ }
+
+ /**
+ * 鍙戦佷笉绗﹀悎閫昏緫鐨勬棩蹇楀埌kafka
+ */
+ private static void sendErrorLogToKafka(List logList, Collector output) {
+ if (logList.size() > 0) {
+ for (String log : logList) {
+ output.collect(log);
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ private static String mergeJson(Object voipLog, Object rtpLog) {
+
+ int rtpPayloadTypeC2s = JsonParseUtil.getInteger(rtpLog, JsonProConfig.RTP_PAYLOAD_TYPE_C2S);
+ int rtpPayloadTypeS2c = JsonParseUtil.getInteger(rtpLog, JsonProConfig.RTP_PAYLOAD_TYPE_S2C);
+ String rtpPcapPath = JsonParseUtil.getString(rtpLog, JsonProConfig.RTP_PCAP_PATH);
+
+ JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_PAYLOAD_TYPE_C2S, rtpPayloadTypeC2s);
+ JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_PAYLOAD_TYPE_S2C, rtpPayloadTypeS2c);
+ JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_PCAP_PATH, rtpPcapPath);
+
+ return JSONObject.toJSONString(voipLog);
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindowFunction.java b/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindowFunction.java
new file mode 100644
index 0000000..2215d5b
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindowFunction.java
@@ -0,0 +1,324 @@
+package com.zdjizhi.utils.functions;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson.JSONObject;
+import com.zdjizhi.common.JsonProConfig;
+import com.zdjizhi.common.VoipRelationConfig;
+import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.utils.json.JsonParseUtil;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+import java.util.*;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.functions
+ * @Description:
+ * @date 2021/7/2113:55
+ */
+public class SipCalibrationWindowFunction extends ProcessWindowFunction, String, String, TimeWindow> {
+ private static final Log logger = LogFactory.get();
+
+ /**
+ * 瀹炰綋绫诲弽灏刴ap
+ */
+ private static HashMap classMap = JsonParseUtil.getMapFromHttp(VoipRelationConfig.SCHEMA_HTTP);
+
+ /**
+ * 鍙嶅皠鎴愪竴涓被
+ */
+ private static Object voipObject = JsonParseUtil.generateObject(classMap);
+
+ /**
+ * 鍏宠仈鐢℉ashMap
+ * key---鍥涘厓缁
+ * value---List瀛樻斁瀵瑰簲SIP鎴栬匯TP鏁版嵁
+ * 瀛樻斁鏁版嵁:rtp-single,rtp-two,sip-two
+ * 涓嶅瓨鏀剧殑鏁版嵁:sip-single涓巗ip-in
+ */
+ private static HashMap> combineSRHmList = new HashMap<>(16);
+
+ /**
+ * 浜屾鍏宠仈鐢℉ashMap
+ * key---鍥涘厓缁
+ * value---List瀛樻斁瀵瑰簲SIP鎴栬匯TP鏁版嵁
+ * 瀛樻斁鏁版嵁:rtp-single,rtp-two,sip-two
+ * 涓嶅瓨鏀剧殑鏁版嵁:sip-single涓巗ip-in
+ */
+ private static HashMap> secCombineSRHmList = new HashMap<>(16);
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void process(String key, Context context, Iterable> input, Collector output) throws Exception {
+ for (Tuple3 tuple : input) {
+ //鎷兼帴鐨勫洓鍏冪粍
+ String fourKey = tuple.f0;
+ //宸插叧鑱旂殑sip,rtp;鏈叧鑱旂殑sip,rtp;鍐呯綉鐨剆ip
+ String type = tuple.f1;
+ String msg = tuple.f2;
+ switch (type) {
+ //鍗曞悜娴佸鍑嗗悗鐨凷IP
+ case "sip-two":
+ //鍗曞悜娴佸鍑嗗悗鐨凴TP
+ case "rtp-two":
+ //瀵逛笉涓婄殑RTP
+ case "rtp-single":
+ putKeyAndMsg(msg, fourKey, combineSRHmList);
+ break;
+ //鍗曞悜娴佺殑SIP
+ case "sip-single":
+ //鍐呯綉鐨凷IP
+ case "sip-in":
+ output.collect(msg);
+ break;
+ default:
+ logger.error("type is beyond expectation:" + type);
+ break;
+
+ }
+ }
+ //鍒濇鍏宠仈
+ tickCombineHmList(combineSRHmList, output);
+ //鍜岀紦瀛樹腑鐨勬暟鎹簩娆″叧鑱
+ tickCombineHmList(secCombineSRHmList, output);
+ }
+
+ /**
+ * 瀹氭椂鍏宠仈,鍖呮嫭鍒濇鍏宠仈浠ュ強鍚庣画浜屾鍏宠仈
+ *
+ * @param combineHmList
+ */
+ private void tickCombineHmList(HashMap> combineHmList, Collector output) {
+ if (combineHmList.size() > 0) {
+ long nowTime = System.currentTimeMillis() / 1000;
+
+ HashMap> tempCombineSRhmList = new HashMap<>(combineHmList);
+ combineHmList.clear();
+
+ for (String fourStrKey : tempCombineSRhmList.keySet()) {
+
+ LinkedList tempList = tempCombineSRhmList.get(fourStrKey);
+ //鍖呭惈SIP鍜孯TP
+ int listSize = tempList.size();
+ System.out.println(listSize);
+ if (listSize > 1) {
+
+ List sipBeanArr = new ArrayList<>();
+ List rtpBeanArr = new ArrayList<>();
+
+ for (String message : tempList) {
+ Object tempSipOrRtpLog = JSONObject.parseObject(message, voipObject.getClass());
+ String schemaType = JsonParseUtil.getString(tempSipOrRtpLog, JsonProConfig.SCHEMA_TYPE);
+ if (JsonProConfig.SIP_MARK.equals(schemaType)) {
+ sipBeanArr.add(message);
+ } else if (JsonProConfig.RTP_MARK.equals(schemaType)) {
+ rtpBeanArr.add(message);
+ }
+ }
+ int rtpSize = rtpBeanArr.size();
+ int sipSize = sipBeanArr.size();
+
+ if (rtpSize == 1 && sipSize >= 1) {
+ for (String sipMessage : sipBeanArr) {
+ Object rtpLog = JSONObject.parseObject(rtpBeanArr.get(0), voipObject.getClass());
+ Object voipLog = JSONObject.parseObject(sipMessage, voipObject.getClass());
+ accumulateVoipMsg(voipLog, rtpLog);
+ JsonParseUtil.setValue(voipLog, JsonProConfig.SCHEMA_TYPE, "VoIP");
+ JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_ORIGINATOR_DIR, judgeDirection(rtpLog, voipLog));
+ //鍥涘厓缁,voip,鍏宠仈鍚庣殑鏁版嵁
+ output.collect(mergeJson(voipLog, rtpLog));
+// basicOutputCollector.emit(new Values(fourStrKey, "voip", mergeJson(voipLog, rtpLog)));
+ }
+ } else if (sipSize == 1 && rtpSize >= 1) {
+ for (String rtpMessage : rtpBeanArr) {
+ Object rtpLog = JSONObject.parseObject(rtpMessage, voipObject.getClass());
+ Object voipLog = JSONObject.parseObject(sipBeanArr.get(0), voipObject.getClass());
+ accumulateVoipMsg(voipLog, rtpLog);
+ JsonParseUtil.setValue(voipLog, JsonProConfig.SCHEMA_TYPE, "VoIP");
+ JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_ORIGINATOR_DIR, judgeDirection(rtpLog, voipLog));
+ //鍥涘厓缁,voip,鍏宠仈鍚庣殑鏁版嵁
+ output.collect(mergeJson(voipLog, rtpLog));
+// basicOutputCollector.emit(new Values(fourStrKey, "voip", mergeJson(voipLog, rtpLog)));
+ }
+ } else {
+ logger.warn("RTP-listSize is [" + rtpBeanArr.size() + "];SIP-listSize is [" + sipBeanArr.size() + "] in this tempCombineSRHmList! Not logical");
+ sendErrorLogToKafka(sipBeanArr, output);
+ sendErrorLogToKafka(rtpBeanArr, output);
+ }
+
+ } else {
+ String msg = tempList.get(0);
+
+ Object voipLog = JSONObject.parseObject(msg, voipObject.getClass());
+ long commonEndTime = JsonParseUtil.getLong(voipLog, JsonProConfig.END_TIME);
+ long intervalTime = nowTime - commonEndTime;
+ if (intervalTime <= VoipRelationConfig.SEC_COMBINE_SR_CACHE_SECS) {
+ putKeyAndMsg(msg, fourStrKey, secCombineSRHmList);
+ } else {
+ sendDirectlyOneElement(msg, voipLog, output);
+ }
+
+ }
+ }
+ }
+ }
+
+ /**
+ * 绱姞鍏宠仈鍚庣殑瀛楄妭绫诲弬鏁板
+ *
+ * @param voipLog
+ * @param rtpLog
+ */
+ private void accumulateVoipMsg(Object voipLog, Object rtpLog) {
+
+ Long sumCommonSessions = JsonParseUtil.getLong(voipLog, JsonProConfig.SESSIONS)
+ + JsonParseUtil.getLong(rtpLog, JsonProConfig.SESSIONS);
+
+ Long sumCommonC2sPktNum = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_PKT_NUM)
+ + JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_PKT_NUM);
+
+ Long sumCommonS2cPktNum = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_PKT_NUM)
+ + JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_PKT_NUM);
+
+ Long sumCommonC2sByteNum = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_BYTE_NUM)
+ + JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_BYTE_NUM);
+
+ Long sumCommonS2cByteNum = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_BYTE_NUM)
+ + JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_BYTE_NUM);
+
+ Long sumCommonC2sIpfragNum = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_IPFRAG_NUM)
+ + JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_IPFRAG_NUM);
+
+ Long sumCommonS2cIpfragNum = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_IPFRAG_NUM)
+ + JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_IPFRAG_NUM);
+
+ Long sumCommonC2sTcpLostlen = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_TCP_LOSTLEN)
+ + JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_TCP_LOSTLEN);
+
+ Long sumCommonS2cTcpLostlen = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_TCP_LOSTLEN)
+ + JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_TCP_LOSTLEN);
+
+ Long sumCommonC2sTcpUnorderNum = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_TCP_UNORDER_NUM)
+ + JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_TCP_UNORDER_NUM);
+
+ Long sumCommonS2cTcpUnorderNum = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_TCP_UNORDER_NUM)
+ + JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_TCP_UNORDER_NUM);
+
+ //common_sessions
+ JsonParseUtil.setValue(voipLog, JsonProConfig.SESSIONS, sumCommonSessions);
+
+ //common_c2s_pkt_num
+ JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_PKT_NUM, sumCommonC2sPktNum);
+ //common_s2c_pkt_num
+ JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_PKT_NUM, sumCommonS2cPktNum);
+ //common_c2s_byte_num
+ JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_BYTE_NUM, sumCommonC2sByteNum);
+ //common_s2c_byte_num
+ JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_BYTE_NUM, sumCommonS2cByteNum);
+
+ //common_c2s_ipfrag_num
+ JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_IPFRAG_NUM, sumCommonC2sIpfragNum);
+ //common_s2c_ipfrag_num
+ JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_IPFRAG_NUM, sumCommonS2cIpfragNum);
+
+ //common_c2s_tcp_lostlen
+ JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_TCP_LOSTLEN, sumCommonC2sTcpLostlen);
+ //common_s2c_tcp_lostlen
+ JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_TCP_LOSTLEN, sumCommonS2cTcpLostlen);
+
+ //common_c2s_tcp_unorder_num
+ JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_TCP_UNORDER_NUM, sumCommonC2sTcpUnorderNum);
+ //common_s2c_tcp_unorder_num
+ JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_TCP_UNORDER_NUM, sumCommonS2cTcpUnorderNum);
+
+ }
+
+ /**
+ * 瀹氭椂澶勭悊涓璍ist鍏冪礌鏁颁粎涓1鐨勬儏鍐
+ */
+ private void sendDirectlyOneElement(String msg, Object voipLog, Collector output) {
+ //鍥涘厓缁,sip(涓瀹氫负鍙屼晶)/rtp(鍙兘涓哄崟渚т篃鍙兘涓哄弻渚,鐪嬪崟鍚戞祦瀛楁淇℃伅),鎷垮嚭鏉ョ殑鍘熷鏁版嵁
+ String commonSchemaType = JsonParseUtil.getString(voipLog, JsonProConfig.SCHEMA_TYPE);
+ if (JsonProConfig.SIP_MARK.equals(commonSchemaType)) {
+ output.collect(msg);
+ } else if (JsonProConfig.RTP_MARK.equals(commonSchemaType)) {
+ int commonStreamDir = JsonParseUtil.getInteger(voipLog, JsonProConfig.STREAM_DIR);
+ if (commonStreamDir != JsonProConfig.DOUBLE) {
+ output.collect(msg);
+ } else {
+ output.collect(msg);
+ }
+ }
+ }
+
+
+ /**
+ * 瀛樻斁key骞舵坊鍔犲搴擫ist
+ */
+ private static void putKeyAndMsg(String message, String fourStrKey, HashMap> combineSRHmList) {
+ if (combineSRHmList.containsKey(fourStrKey)) {
+ LinkedList tmpList = combineSRHmList.get(fourStrKey);
+ tmpList.add(message);
+ combineSRHmList.put(fourStrKey, tmpList);
+ } else {
+ LinkedList tmpList = new LinkedList<>();
+ tmpList.add(message);
+ combineSRHmList.put(fourStrKey, tmpList);
+ }
+ }
+
+ /**
+ * 鍒ゆ柇RTP涓诲彨鏂瑰悜-娴嬭瘯
+ *
+ * @param rtpLog RTP鍘熷鏃ュ織
+ * @param voipLog 铻嶅悎鍚嶸OIP鏃ュ織
+ * @return 鏂瑰悜 0锛氭湭鐭 1锛歝2s 2锛歴2c
+ */
+ private static int judgeDirection(Object rtpLog, Object voipLog) {
+
+ String ip = JsonParseUtil.getString(rtpLog, JsonProConfig.CLIENT_IP);
+ String sipOriginatorIp = JsonParseUtil.getString(voipLog, JsonProConfig.SIP_ORIGINATOR_IP);
+ String sipResponderIp = JsonParseUtil.getString(voipLog, JsonProConfig.SIP_RESPONDER_IP);
+
+ if (StringUtil.isNotBlank(ip) && ip.equals(sipOriginatorIp)) {
+ return 1;
+ } else if (StringUtil.isNotBlank(ip) && ip.equals(sipResponderIp)) {
+ return 2;
+ }
+
+ return 0;
+ }
+
+ /**
+ * 鍙戦佷笉绗﹀悎閫昏緫鐨勬棩蹇楀埌kafka
+ */
+ private static void sendErrorLogToKafka(List logList, Collector output) {
+ if (logList.size() > 0) {
+ for (String log : logList) {
+ output.collect(log);
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ private static String mergeJson(Object voipLog, Object rtpLog) {
+
+ long rtpPayloadTypeC2s = JsonParseUtil.getLong(rtpLog, JsonProConfig.RTP_PAYLOAD_TYPE_C2S);
+ long rtpPayloadTypeS2c = JsonParseUtil.getLong(rtpLog, JsonProConfig.RTP_PAYLOAD_TYPE_S2C);
+ String rtpPcapPath = JsonParseUtil.getString(rtpLog, JsonProConfig.RTP_PCAP_PATH);
+
+ JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_PAYLOAD_TYPE_C2S, rtpPayloadTypeC2s);
+ JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_PAYLOAD_TYPE_S2C, rtpPayloadTypeS2c);
+ JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_PCAP_PATH, rtpPcapPath);
+
+ return JSONObject.toJSONString(voipLog);
+ }
+
+
+}
diff --git a/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java b/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java
new file mode 100644
index 0000000..1adb1d1
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java
@@ -0,0 +1,77 @@
+package com.zdjizhi.utils.http;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+/**
+ * 鑾峰彇缃戝叧schema鐨勫伐鍏风被
+ *
+ * @author qidaijie
+ */
+public class HttpClientUtil {
+ private static final Log logger = LogFactory.get();
+
+ /**
+ * 璇锋眰缃戝叧鑾峰彇schema
+ *
+ * @param http 缃戝叧url
+ * @return schema
+ */
+ public static String requestByGetMethod(String http) {
+ CloseableHttpClient httpClient = HttpClients.createDefault();
+ StringBuilder entityStringBuilder;
+
+ HttpGet get = new HttpGet(http);
+ BufferedReader bufferedReader = null;
+ CloseableHttpResponse httpResponse = null;
+ try {
+ httpResponse = httpClient.execute(get);
+ HttpEntity entity = httpResponse.getEntity();
+ entityStringBuilder = new StringBuilder();
+ if (null != entity) {
+ bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024);
+ int intC;
+ while ((intC = bufferedReader.read()) != -1) {
+ char c = (char) intC;
+ if (c == '\n') {
+ break;
+ }
+ entityStringBuilder.append(c);
+ }
+
+ return entityStringBuilder.toString();
+ }
+ } catch (IOException e) {
+ logger.error("Get Schema from Query engine ERROR! Exception message is:" + e);
+ } finally {
+ if (httpClient != null) {
+ try {
+ httpClient.close();
+ } catch (IOException e) {
+ logger.error("Close HTTP Client ERROR! Exception messgae is:" + e);
+ }
+ }
+ if (httpResponse != null) {
+ try {
+ httpResponse.close();
+ } catch (IOException e) {
+ logger.error("Close httpResponse ERROR! Exception messgae is:" + e);
+ }
+ }
+ if (bufferedReader != null) {
+ IOUtils.closeQuietly(bufferedReader);
+ }
+ }
+ return "";
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/ip/IPUtils.java b/src/main/java/com/zdjizhi/utils/ip/IPUtils.java
new file mode 100644
index 0000000..1757194
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/ip/IPUtils.java
@@ -0,0 +1,96 @@
+package com.zdjizhi.utils.ip;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.utils.IPUtil;
+import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.utils.exception.VoipRelationException;
+
+/**
+ * IP杞崲宸ュ叿绫
+ *
+ * @author Colbert
+ * @date 2021/03/16
+ */
+public class IPUtils {
+ private static final Log logger = LogFactory.get();
+
+ private static final long A_BEGIN = ipToLong("10.0.0.0");
+ private static final long A_END = ipToLong("10.255.255.255");
+ private static final long B_BEGIN = ipToLong("172.16.0.0");
+ private static final long B_END = ipToLong("172.31.255.255");
+ private static final long C_BEGIN = ipToLong("192.168.0.0");
+ private static final long C_END = ipToLong("192.168.255.255");
+
+ /**
+ * 灏127.0.0.1褰㈠紡鐨処P鍦板潃杞崲鎴愬崄杩涘埗鏁存暟
+ *
+ * @param strIp
+ * @return
+ */
+ public static long ipToLong(String strIp) {
+ try {
+ if (StringUtil.isBlank(strIp)) {
+ logger.error("IPUtils.ipToLong input IP is null!!!");
+ return 0L;
+ }
+ long[] ip = new long[4];
+ int position1 = strIp.indexOf(".");
+ int position2 = strIp.indexOf(".", position1 + 1);
+ int position3 = strIp.indexOf(".", position2 + 1);
+
+ ip[0] = Long.parseLong(strIp.substring(0, position1));
+ ip[1] = Long.parseLong(strIp.substring(position1 + 1, position2));
+ ip[2] = Long.parseLong(strIp.substring(position2 + 1, position3));
+ ip[3] = Long.parseLong(strIp.substring(position3 + 1));
+ return (ip[0] << 24) + (ip[1] << 16) + (ip[2] << 8) + ip[3];
+ } catch (VoipRelationException e) {
+ logger.error("IPUtils.ipToLong input IP is:" + strIp + ",convert IP to Long is error:" + e.getMessage());
+ return 0L;
+ }
+
+ }
+
+ /**
+ * 灏嗗崄杩涘埗鏁存暟褰㈠紡杞崲鎴127.0.0.1褰㈠紡鐨刬p鍦板潃
+ *
+ * @param longIp
+ * @return
+ */
+ public static String longToIp(long longIp) {
+ StringBuffer sb = new StringBuffer("");
+ sb.append(String.valueOf((longIp >>> 24)));
+ sb.append(".");
+ sb.append(String.valueOf((longIp & 0x00FFFFFF) >>> 16));
+ sb.append(".");
+ sb.append(String.valueOf((longIp & 0x0000FFFF) >>> 8));
+ sb.append(".");
+ sb.append(String.valueOf((longIp & 0x000000FF)));
+ return sb.toString();
+ }
+
+
+ /**
+ * 鏄惁涓哄唴缃慖P
+ *
+ * @param ipAddress
+ * @return
+ */
+ public static boolean isInnerIp(String ipAddress) {
+ if (StringUtil.isBlank(ipAddress) || IPUtil.internalIp(ipAddress)) {
+ //涓虹┖鎴栬呬负鐗瑰畾IP鏃朵篃绠椾綔鍐呯綉IP
+ return true;
+ }
+
+ boolean isInnerIp = false;
+ long ipNum = ipToLong(ipAddress);
+
+ isInnerIp = isInner(ipNum, A_BEGIN, A_END) || isInner(ipNum, B_BEGIN, B_END) || isInner(ipNum, C_BEGIN, C_END);
+ return isInnerIp;
+ }
+
+ private static boolean isInner(long userIp, long begin, long end) {
+ return (userIp >= begin) && (userIp <= end);
+ }
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java
new file mode 100644
index 0000000..2a3194f
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java
@@ -0,0 +1,313 @@
+package com.zdjizhi.utils.json;
+
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.jayway.jsonpath.DocumentContext;
+import com.jayway.jsonpath.JsonPath;
+import com.zdjizhi.common.VoipRelationConfig;
+import com.zdjizhi.utils.JsonMapper;
+import com.zdjizhi.utils.http.HttpClientUtil;
+import net.sf.cglib.beans.BeanGenerator;
+import net.sf.cglib.beans.BeanMap;
+
+import java.util.*;
+
+/**
+ * 浣跨敤FastJson瑙f瀽json鐨勫伐鍏风被
+ *
+ * @author qidaijie
+ */
+public class JsonParseUtil {
+
+ private static final Log logger = LogFactory.get();
+
+ /**
+ * 妯″紡鍖归厤锛岀粰瀹氫竴涓被鍨嬪瓧绗︿覆杩斿洖涓涓被绫诲瀷
+ *
+ * @param type 绫诲瀷
+ * @return 绫荤被鍨
+ */
+
+ public static Class getClassName(String type) {
+ Class clazz;
+
+ switch (type) {
+ case "int":
+ clazz = Integer.class;
+ break;
+ case "string":
+ clazz = String.class;
+ break;
+ case "long":
+ clazz = long.class;
+ break;
+ case "array":
+ clazz = List.class;
+ break;
+ case "double":
+ clazz = double.class;
+ break;
+ case "float":
+ clazz = float.class;
+ break;
+ case "char":
+ clazz = char.class;
+ break;
+ case "byte":
+ clazz = byte.class;
+ break;
+ case "boolean":
+ clazz = boolean.class;
+ break;
+ case "short":
+ clazz = short.class;
+ break;
+ default:
+ clazz = String.class;
+ }
+ return clazz;
+ }
+
+
+ /**
+ * 鑾峰彇灞炴у肩殑鏂规硶
+ *
+ * @param obj 瀵硅薄
+ * @param property key
+ * @return 灞炴х殑鍊
+ */
+ public static Object getValue(Object obj, String property) {
+
+ try {
+ BeanMap beanMap = BeanMap.create(obj);
+ if (beanMap.containsKey(property)) {
+ return beanMap.get(property);
+ } else {
+ return null;
+ }
+ } catch (RuntimeException e) {
+ logger.error("鑾峰彇json-value寮傚父锛屽紓甯竗ey锛" + property + "寮傚父淇℃伅涓猴細" + e);
+ return null;
+ }
+ }
+
+ /**
+ * 鑾峰彇灞炴у肩殑鏂规硶
+ *
+ * @param jsonMap 鍘熷鏃ュ織
+ * @param property key
+ * @return 灞炴х殑鍊
+ */
+ public static Object getValue(Map jsonMap, String property) {
+ try {
+ return jsonMap.getOrDefault(property, null);
+ } catch (RuntimeException e) {
+ logger.error("鑾峰彇json-value寮傚父锛屽紓甯竗ey锛" + property + "寮傚父淇℃伅涓猴細" + e);
+ return null;
+ }
+ }
+
+ /**
+ * long 绫诲瀷妫楠岃浆鎹㈡柟娉,鑻ヤ负绌鸿繑鍥炲熀纭鍊
+ *
+ * @return Long value
+ */
+ public static Long getLong(Map jsonMap, String property) {
+ Object value = jsonMap.getOrDefault(property, null);
+ Long longVal = TypeUtils.castToLong(value);
+
+ if (longVal == null) {
+ return 0L;
+ }
+
+ return longVal;
+ }
+
+ /**
+ * long 绫诲瀷妫楠岃浆鎹㈡柟娉,鑻ヤ负绌鸿繑鍥炲熀纭鍊
+ *
+ * @return Long value
+ */
+ public static Long getLong(Object jsonMap, String property) {
+ Object value = getValue(jsonMap, property);
+ Long longVal = TypeUtils.castToLong(value);
+
+ if (longVal == null) {
+ return 0L;
+ }
+
+ return longVal;
+ }
+
+ /**
+ * int 绫诲瀷妫楠岃浆鎹㈡柟娉,鑻ヤ负绌鸿繑鍥炲熀纭鍊
+ *
+ * @return int value
+ */
+ public static int getInteger(Map jsonMap, String property) {
+ Object value = jsonMap.getOrDefault(property, null);
+ Integer intVal = TypeUtils.castToInt(value);
+
+ if (intVal == null) {
+ return 0;
+ }
+ return intVal.intValue();
+ }
+
+ /**
+ * int 绫诲瀷妫楠岃浆鎹㈡柟娉,鑻ヤ负绌鸿繑鍥炲熀纭鍊
+ *
+ * @return int value
+ */
+ public static int getInteger(Object jsonMap, String property) {
+ Object value = getValue(jsonMap, property);
+ Integer intVal = TypeUtils.castToInt(value);
+
+ if (intVal == null) {
+ return 0;
+ }
+ return intVal.intValue();
+ }
+
+ public static String getString(Map jsonMap, String property) {
+ Object value = jsonMap.getOrDefault(property, null);
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Map) {
+ return JsonMapper.toJsonString(value);
+ }
+
+ if (value instanceof List) {
+ return JsonMapper.toJsonString(value);
+ }
+
+ return value.toString();
+ }
+
+ /**
+ * Object 鏂瑰紡鑾峰彇getString绫诲瀷瀛楁
+ *
+ * @param jsonObject
+ * @param property
+ * @return
+ */
+ public static String getString(Object jsonObject, String property) {
+ Object value = getValue(jsonObject, property);
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Map) {
+ return JsonMapper.toJsonString(value);
+ }
+
+ if (value instanceof List) {
+ return JsonMapper.toJsonString(value);
+ }
+
+ return value.toString();
+ }
+
+ /**
+ * 鏇存柊灞炴у肩殑鏂规硶
+ *
+ * @param jsonMap 鍘熷鏃ュ織json map
+ * @param property 鏇存柊鐨刱ey
+ * @param value 鏇存柊鐨勫
+ */
+ public static void setValue(Map jsonMap, String property, Object value) {
+ try {
+ jsonMap.put(property, value);
+ } catch (RuntimeException e) {
+ logger.error("璧嬩簣瀹炰綋绫婚敊璇被鍨嬫暟鎹", e);
+ }
+ }
+
+ /**
+ * 鏇存柊灞炴у肩殑鏂规硶
+ *
+ * @param obj 瀵硅薄
+ * @param property 鏇存柊鐨刱ey
+ * @param value 鏇存柊鐨勫
+ */
+ public static void setValue(Object obj, String property, Object value) {
+ try {
+ BeanMap beanMap = BeanMap.create(obj);
+ beanMap.put(property, value);
+ } catch (ClassCastException e) {
+ logger.error("璧嬩簣瀹炰綋绫婚敊璇被鍨嬫暟鎹", e);
+ }
+ }
+
+ /**
+ * 鏍规嵁鍙嶅皠鐢熸垚瀵硅薄鐨勬柟娉
+ *
+ * @param properties 鍙嶅皠绫荤敤鐨刴ap
+ * @return 鐢熸垚鐨凮bject绫诲瀷鐨勫璞
+ */
+ public static Object generateObject(Map properties) {
+ BeanGenerator generator = new BeanGenerator();
+ Set keySet = properties.keySet();
+ for (Object aKeySet : keySet) {
+ String key = (String) aKeySet;
+ generator.addProperty(key, (Class) properties.get(key));
+ }
+ return generator.create();
+ }
+
+ /**
+ * 閫氳繃鑾峰彇String绫诲瀷鐨勭綉鍏硈chema閾炬帴鏉ヨ幏鍙杕ap锛岀敤浜庣敓鎴愪竴涓狾bject绫诲瀷鐨勫璞
+ *
+ * @param http 缃戝叧schema鍦板潃
+ * @return 鐢ㄤ簬鍙嶅皠鐢熸垚schema绫诲瀷鐨勫璞$殑涓涓猰ap闆嗗悎
+ */
+ public static HashMap getMapFromHttp(String http) {
+ HashMap map = new HashMap<>();
+
+ String schema = HttpClientUtil.requestByGetMethod(http);
+ Object data = JSON.parseObject(schema).get("data");
+
+ //鑾峰彇fields锛屽苟杞寲涓烘暟缁勶紝鏁扮粍鐨勬瘡涓厓绱犻兘鏄竴涓猲ame doc type
+ JSONObject schemaJson = JSON.parseObject(data.toString());
+ JSONArray fields = (JSONArray) schemaJson.get("fields");
+
+ for (Object field : fields) {
+ String filedStr = field.toString();
+ if (checkKeepField(filedStr)) {
+ String name = JsonPath.read(filedStr, "$.name").toString();
+ String type = JsonPath.read(filedStr, "$.type").toString();
+ //缁勫悎鐢ㄦ潵鐢熸垚瀹炰綋绫荤殑map
+ map.put(name, getClassName(type));
+ }
+ }
+ return map;
+ }
+
+ /**
+ * 鍒ゆ柇瀛楁鏄惁闇瑕佷繚鐣
+ *
+ * @param message 鍗曚釜field-json
+ * @return true or false
+ */
+ private static boolean checkKeepField(String message) {
+ boolean isKeepField = true;
+ boolean isHiveDoc = JSON.parseObject(message).containsKey("doc");
+ if (isHiveDoc) {
+ boolean isHiveVi = JsonPath.read(message, "$.doc").toString().contains("visibility");
+ if (isHiveVi) {
+ String visibility = JsonPath.read(message, "$.doc.visibility").toString();
+ if (VoipRelationConfig.VISIBILITY.equals(visibility)) {
+ isKeepField = false;
+ }
+ }
+ }
+ return isKeepField;
+ }
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java
new file mode 100644
index 0000000..8562679
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java
@@ -0,0 +1,142 @@
+package com.zdjizhi.utils.json;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.utils.JsonMapper;
+import com.zdjizhi.utils.exception.VoipRelationException;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author qidaijie
+ * @Package PACKAGE_NAME
+ * @Description:
+ * @date 2021/7/1217:34
+ */
+public class JsonTypeUtils {
+ private static final Log logger = LogFactory.get();
+ /**
+ * String 绫诲瀷妫楠岃浆鎹㈡柟娉
+ *
+ * @param value json value
+ * @return String value
+ */
+ public static String checkString(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Map) {
+ return JsonMapper.toJsonString(value);
+ }
+
+ if (value instanceof List) {
+ return JsonMapper.toJsonString(value);
+ }
+
+ return value.toString();
+ }
+
+ /**
+ * array 绫诲瀷妫楠岃浆鎹㈡柟娉
+ *
+ * @param value json value
+ * @return List value
+ */
+ private static Map checkObject(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Map) {
+ return (Map) value;
+ }
+
+ throw new VoipRelationException("can not cast to map, value : " + value);
+ }
+
+ /**
+ * array 绫诲瀷妫楠岃浆鎹㈡柟娉
+ *
+ * @param value json value
+ * @return List value
+ */
+ private static List checkArray(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof List) {
+ return (List) value;
+ }
+
+ throw new VoipRelationException("can not cast to List, value : " + value);
+ }
+
+ private static Long checkLong(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ return TypeUtils.castToLong(value);
+ }
+
+ /**
+ * long 绫诲瀷妫楠岃浆鎹㈡柟娉,鑻ヤ负绌鸿繑鍥炲熀纭鍊
+ *
+ * @param value json value
+ * @return Long value
+ */
+ public static long checkLongValue(Object value) {
+
+ Long longVal = TypeUtils.castToLong(value);
+
+ if (longVal == null) {
+ return 0L;
+ }
+
+ return longVal;
+ }
+
+ /**
+ * Double 绫诲瀷鏍¢獙杞崲鏂规硶
+ *
+ * @param value json value
+ * @return Double value
+ */
+ private static Double checkDouble(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ return TypeUtils.castToDouble(value);
+ }
+
+
+ private static Integer checkInt(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ return TypeUtils.castToInt(value);
+ }
+
+
+ /**
+ * int 绫诲瀷妫楠岃浆鎹㈡柟娉,鑻ヤ负绌鸿繑鍥炲熀纭鍊
+ *
+ * @param value json value
+ * @return int value
+ */
+ private static int getIntValue(Object value) {
+
+ Integer intVal = TypeUtils.castToInt(value);
+
+ if (intVal == null) {
+ return 0;
+ }
+ return intVal;
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/utils/json/TypeUtils.java b/src/main/java/com/zdjizhi/utils/json/TypeUtils.java
new file mode 100644
index 0000000..1e0f156
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/json/TypeUtils.java
@@ -0,0 +1,180 @@
+package com.zdjizhi.utils.json;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.common.VoipRelationConfig;
+import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.utils.exception.VoipRelationException;
+
+
+/**
+ * @author qidaijie
+ * @Package PACKAGE_NAME
+ * @Description:
+ * @date 2021/7/1218:20
+ */
+public class TypeUtils {
+ private static final Log logger = LogFactory.get();
+
+ /**
+ * Integer 绫诲瀷鍒ゆ柇鏂规硶
+ *
+ * @param value json value
+ * @return Integer value or null
+ */
+ public static Object castToIfFunction(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof String) {
+ return value.toString();
+ }
+
+ if (value instanceof Integer) {
+ return ((Number) value).intValue();
+ }
+
+ if (value instanceof Long) {
+ return ((Number) value).longValue();
+ }
+
+// if (value instanceof Map) {
+// return (Map) value;
+// }
+//
+// if (value instanceof List) {
+// return Collections.singletonList(value.toString());
+// }
+
+ if (value instanceof Boolean) {
+ return (Boolean) value ? 1 : 0;
+ }
+
+ throw new VoipRelationException("can not cast to int, value : " + value);
+ }
+
+ /**
+ * Integer 绫诲瀷鍒ゆ柇鏂规硶
+ *
+ * @param value json value
+ * @return Integer value or null
+ */
+ static Integer castToInt(Object value) {
+
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Integer) {
+ return (Integer) value;
+ }
+
+ //姝ゅ垽鏂暟鍊艰秴鑼冨洿涓嶆姏鍑哄紓甯革紝浼氭埅鍙栨垚瀵瑰簲绫诲瀷鏁板
+// if (value instanceof Number) {
+// return ((Number) value).intValue();
+// }
+
+ if (value instanceof String) {
+ String strVal = (String) value;
+ if (StringUtil.isBlank(strVal)) {
+ return null;
+ }
+
+ //灏 10,20 绫绘暟鎹浆鎹负10
+ if (strVal.contains(VoipRelationConfig.FORMAT_SPLITTER)) {
+ strVal = strVal.split(VoipRelationConfig.FORMAT_SPLITTER)[0];
+ }
+
+ try {
+ return Integer.parseInt(strVal);
+ } catch (NumberFormatException ex) {
+ logger.error("String change Integer Error,The error Str is:" + strVal);
+ }
+ }
+
+ if (value instanceof Boolean) {
+ return (Boolean) value ? 1 : 0;
+ }
+
+ throw new VoipRelationException("can not cast to int, value : " + value);
+ }
+
+ /**
+ * Double绫诲瀷鍒ゆ柇鏂规硶
+ *
+ * @param value json value
+ * @return double value or null
+ */
+ static Double castToDouble(Object value) {
+
+ if (value instanceof Double) {
+ return (Double) value;
+ }
+
+ //姝ゅ垽鏂暟鍊艰秴鑼冨洿涓嶆姏鍑哄紓甯革紝浼氭埅鍙栨垚瀵瑰簲绫诲瀷鏁板
+// if (value instanceof Number) {
+// return ((Number) value).doubleValue();
+// }
+
+ if (value instanceof String) {
+ String strVal = (String) value;
+
+ if (StringUtil.isBlank(strVal)) {
+ return null;
+ }
+
+ //灏 10,20 绫绘暟鎹浆鎹负10
+ if (strVal.contains(VoipRelationConfig.FORMAT_SPLITTER)) {
+ strVal = strVal.split(VoipRelationConfig.FORMAT_SPLITTER)[0];
+ }
+
+ try {
+ return Double.parseDouble(strVal);
+ } catch (NumberFormatException ex) {
+ logger.error("String change Double Error,The error Str is:" + strVal);
+ }
+ }
+
+ throw new VoipRelationException("can not cast to double, value : " + value);
+ }
+
+ /**
+ * Long绫诲瀷鍒ゆ柇鏂规硶
+ *
+ * @param value json value
+ * @return (Long)value or null
+ */
+ static Long castToLong(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+// 姝ゅ垽鏂暟鍊艰秴鑼冨洿涓嶆姏鍑哄紓甯革紝浼氭埅鍙栨垚瀵瑰簲绫诲瀷鏁板
+ if (value instanceof Number) {
+ return ((Number) value).longValue();
+ }
+
+ if (value instanceof String) {
+ String strVal = (String) value;
+
+ if (StringUtil.isBlank(strVal)) {
+ return null;
+ }
+
+ //灏 10,20 绫绘暟鎹浆鎹负10
+ if (strVal.contains(VoipRelationConfig.FORMAT_SPLITTER)) {
+ strVal = strVal.split(VoipRelationConfig.FORMAT_SPLITTER)[0];
+ }
+
+ try {
+ return Long.parseLong(strVal);
+ } catch (NumberFormatException ex) {
+ logger.error("String change Long Error,The error Str is:" + strVal);
+ }
+ }
+
+ throw new VoipRelationException("can not cast to long, value : " + value);
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java
new file mode 100644
index 0000000..83a39ad
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java
@@ -0,0 +1,36 @@
+package com.zdjizhi.utils.kafka;
+
+import com.zdjizhi.common.VoipRelationConfig;
+import org.apache.kafka.common.config.SslConfigs;
+
+import java.util.Properties;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.kafka
+ * @Description:
+ * @date 2021/9/610:37
+ */
+class CertUtils {
+ static void chooseCert(String type, Properties properties) {
+ switch (type) {
+ case "SSL":
+ properties.put("security.protocol", "SSL");
+ properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
+ properties.put("ssl.keystore.location", VoipRelationConfig.TOOLS_LIBRARY + "keystore.jks");
+ properties.put("ssl.keystore.password", VoipRelationConfig.KAFKA_PIN);
+ properties.put("ssl.truststore.location", VoipRelationConfig.TOOLS_LIBRARY + "truststore.jks");
+ properties.put("ssl.truststore.password", VoipRelationConfig.KAFKA_PIN);
+ properties.put("ssl.key.password", VoipRelationConfig.KAFKA_PIN);
+ break;
+ case "SASL":
+ properties.put("security.protocol", "SASL_PLAINTEXT");
+ properties.put("sasl.mechanism", "PLAIN");
+ properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
+ + VoipRelationConfig.KAFKA_USER + " password=" + VoipRelationConfig.KAFKA_PIN + ";");
+ break;
+ default:
+ }
+
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java b/src/main/java/com/zdjizhi/utils/kafka/Consumer.java
new file mode 100644
index 0000000..6dde52a
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/kafka/Consumer.java
@@ -0,0 +1,40 @@
+package com.zdjizhi.utils.kafka;
+
+import com.zdjizhi.common.VoipRelationConfig;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+
+import java.util.Properties;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.kafka
+ * @Description:
+ * @date 2021/6/813:54
+ */
+public class Consumer {
+ private static Properties createConsumerConfig() {
+ Properties properties = new Properties();
+ properties.put("bootstrap.servers", VoipRelationConfig.INPUT_KAFKA_SERVERS);
+ properties.put("group.id", VoipRelationConfig.GROUP_ID);
+ properties.put("session.timeout.ms", "60000");
+ properties.put("max.poll.records", 3000);
+ properties.put("max.partition.fetch.bytes", 31457280);
+ properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+
+ CertUtils.chooseCert(VoipRelationConfig.KAFKA_SOURCE_PROTOCOL,properties);
+ return properties;
+ }
+
+ public static FlinkKafkaConsumer getKafkaConsumer() {
+ FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(VoipRelationConfig.INPUT_KAFKA_TOPIC,
+ new SimpleStringSchema(), createConsumerConfig());
+
+ kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
+ kafkaConsumer.setStartFromGroupOffsets();
+
+ return kafkaConsumer;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/kafka/Producer.java b/src/main/java/com/zdjizhi/utils/kafka/Producer.java
new file mode 100644
index 0000000..bf63098
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/kafka/Producer.java
@@ -0,0 +1,50 @@
+package com.zdjizhi.utils.kafka;
+
+import com.zdjizhi.common.VoipRelationConfig;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.kafka
+ * @Description:
+ * @date 2021/6/814:04
+ */
+public class Producer {
+
+ private static Properties createProducerConfig() {
+ Properties properties = new Properties();
+ properties.put("bootstrap.servers", VoipRelationConfig.OUTPUT_KAFKA_SERVERS);
+ properties.put("acks", VoipRelationConfig.PRODUCER_ACK);
+ properties.put("retries", VoipRelationConfig.RETRIES);
+ properties.put("linger.ms", VoipRelationConfig.LINGER_MS);
+ properties.put("request.timeout.ms", VoipRelationConfig.REQUEST_TIMEOUT_MS);
+ properties.put("batch.size", VoipRelationConfig.BATCH_SIZE);
+ properties.put("buffer.memory", VoipRelationConfig.BUFFER_MEMORY);
+ properties.put("max.request.size", VoipRelationConfig.MAX_REQUEST_SIZE);
+ properties.put("compression.type", VoipRelationConfig.PRODUCER_KAFKA_COMPRESSION_TYPE);
+
+ CertUtils.chooseCert(VoipRelationConfig.KAFKA_SINK_PROTOCOL, properties);
+
+ return properties;
+ }
+
+
+ public static FlinkKafkaProducer getKafkaProducer() {
+ FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer(
+ VoipRelationConfig.OUTPUT_KAFKA_TOPIC,
+ new SimpleStringSchema(),
+ createProducerConfig(), Optional.empty());
+
+ //鍚敤姝ら夐」灏嗕娇鐢熶骇鑰呬粎璁板綍澶辫触鏃ュ織鑰屼笉鏄崟鑾峰拰閲嶆柊鎶涘嚭瀹冧滑
+ kafkaProducer.setLogFailuresOnly(false);
+
+ //鍐欏叆kafka鐨勬秷鎭惡甯︽椂闂存埑
+// kafkaProducer.setWriteTimestampToKafka(true);
+
+ return kafkaProducer;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/system/VoipRelationConfigurations.java b/src/main/java/com/zdjizhi/utils/system/VoipRelationConfigurations.java
new file mode 100644
index 0000000..95e322b
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/system/VoipRelationConfigurations.java
@@ -0,0 +1,71 @@
+package com.zdjizhi.utils.system;
+
+import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.utils.exception.VoipRelationException;
+
+import java.io.IOException;
+import java.util.Locale;
+import java.util.Properties;
+
+
+/**
+ * @author Administrator
+ */
+
+public final class VoipRelationConfigurations {
+
+ private static Properties propDefault = new Properties();
+ private static Properties propService = new Properties();
+
+
+ public static String getStringProperty(Integer type, String key) {
+ if (type == 0) {
+ return propService.getProperty(key);
+ } else if (type == 1) {
+ return propDefault.getProperty(key);
+ } else {
+ return null;
+ }
+
+ }
+
+ public static Integer getIntProperty(Integer type, String key) {
+ if (type == 0) {
+ return Integer.parseInt(propService.getProperty(key));
+ } else if (type == 1) {
+ return Integer.parseInt(propDefault.getProperty(key));
+ } else {
+ return null;
+ }
+ }
+
+ public static Long getLongProperty(Integer type, String key) {
+ if (type == 0) {
+ return Long.parseLong(propService.getProperty(key));
+ } else if (type == 1) {
+ return Long.parseLong(propDefault.getProperty(key));
+ } else {
+ return null;
+ }
+ }
+
+ public static Boolean getBooleanProperty(Integer type, String key) {
+ if (type == 0) {
+ return StringUtil.equals(propService.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true");
+ } else if (type == 1) {
+ return StringUtil.equals(propDefault.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true");
+ } else {
+ return null;
+ }
+ }
+
+ static {
+ try {
+ propService.load(VoipRelationConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties"));
+ propDefault.load(VoipRelationConfigurations.class.getClassLoader().getResourceAsStream("default_config.properties"));
+ } catch (IOException | VoipRelationException e) {
+ propDefault = null;
+ propService = null;
+ }
+ }
+}
diff --git a/src/main/java/log4j.properties b/src/main/java/log4j.properties
new file mode 100644
index 0000000..9d91936
--- /dev/null
+++ b/src/main/java/log4j.properties
@@ -0,0 +1,25 @@
+#Log4j
+log4j.rootLogger=info,console,file
+# 鎺у埗鍙版棩蹇楄缃
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.Threshold=info
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n
+
+# 鏂囦欢鏃ュ織璁剧疆
+log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.file.Threshold=info
+log4j.appender.file.encoding=UTF-8
+log4j.appender.file.Append=true
+#璺緞璇风敤鐩稿璺緞锛屽仛濂界浉鍏虫祴璇曡緭鍑哄埌搴旂敤鐩笅
+log4j.appender.file.file=${nis.root}/log/galaxy-name.log
+log4j.appender.file.DatePattern='.'yyyy-MM-dd
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n
+log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n
+#MyBatis 閰嶇疆锛宑om.nis.web.dao鏄痬ybatis鎺ュ彛鎵鍦ㄥ寘
+log4j.logger.com.nis.web.dao=debug
+#bonecp鏁版嵁婧愰厤缃
+log4j.category.com.jolbox=debug,console
+
+