diff --git a/pom.xml b/pom.xml
index 1491d5e..64defb4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
com.zdjizhi
log-stream-voip-relation
- 210908-security
+ 220309-inner
log-stream-voip-relation
http://www.example.com
@@ -122,7 +122,7 @@
com.zdjizhi
galaxy
- 1.0.6
+ 1.0.8
slf4j-log4j12
diff --git a/properties/default_config.properties b/properties/default_config.properties
index 2b9bfb1..772658e 100644
--- a/properties/default_config.properties
+++ b/properties/default_config.properties
@@ -1,29 +1,41 @@
-#producer重试的次数设置
+#====================Kafka KafkaConsumer====================#
+#kafka source connection timeout
+session.timeout.ms=60000
+
+#kafka source poll
+max.poll.records=3000
+
+#kafka source poll bytes
+max.partition.fetch.bytes=31457280
+#====================Kafka KafkaProducer====================#
+#producer閲嶈瘯鐨勬鏁拌缃
retries=0
-#他的含义就是说一个Batch被创建之后,最多过多久,不管这个Batch有没有写满,都必须发送出去了
-linger.ms=5
+#浠栫殑鍚箟灏辨槸璇翠竴涓狟atch琚垱寤轰箣鍚庯紝鏈澶氳繃澶氫箙锛屼笉绠¤繖涓狟atch鏈夋病鏈夊啓婊★紝閮藉繀椤诲彂閫佸嚭鍘讳簡
+linger.ms=10
-#如果在超时之前未收到响应,客户端将在必要时重新发送请求
+#濡傛灉鍦ㄨ秴鏃朵箣鍓嶆湭鏀跺埌鍝嶅簲锛屽鎴风灏嗗湪蹇呰鏃堕噸鏂板彂閫佽姹
request.timeout.ms=30000
-#producer都是按照batch进行发送的,批次大小,默认:16384
+#producer閮芥槸鎸夌収batch杩涜鍙戦佺殑,鎵规澶у皬锛岄粯璁:16384
batch.size=262144
-#Producer端用于缓存消息的缓冲区大小
-buffer.memory=67108864
+#Producer绔敤浜庣紦瀛樻秷鎭殑缂撳啿鍖哄ぇ灏
+#128M
+buffer.memory=134217728
-#这个参数决定了每次发送给Kafka服务器请求的最大大小,默认1048576
-max.request.size=5242880
-
-#kafka SASL验证用户名
+#杩欎釜鍙傛暟鍐冲畾浜嗘瘡娆″彂閫佺粰Kafka鏈嶅姟鍣ㄨ姹傜殑鏈澶уぇ灏,榛樿1048576
+#10M
+max.request.size=10485760
+#====================kafka default====================#
+#kafka SASL楠岃瘉鐢ㄦ埛鍚
kafka.user=admin
-#kafka SASL及SSL验证密码
+#kafka SASL鍙奡SL楠岃瘉瀵嗙爜
kafka.pin=galaxy2019
-#kafka source protocol; SSL or SASL
-kafka.source.protocol=SASL
+#====================Topology Default====================#
+#check ip is Inner network;0 off, 1 on.
+check.inner.network=1
+
-#kafka sink protocol; SSL or SASL
-kafka.sink.protocol=SASL
\ No newline at end of file
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
index 939b6d8..2b49df7 100644
--- a/properties/service_flow_config.properties
+++ b/properties/service_flow_config.properties
@@ -1,28 +1,28 @@
#--------------------------------鍦板潃閰嶇疆------------------------------#
#绠$悊kafka鍦板潃
-input.kafka.servers=192.168.44.12:9092
+source.kafka.servers=192.168.44.12:9094
#绠$悊杈撳嚭kafka鍦板潃
-output.kafka.servers=192.168.44.12:9092
+sink.kafka.servers=192.168.44.12:9094
#--------------------------------HTTP------------------------------#
#瀹氫綅搴撳湴鍧
-tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\
+tools.library=D:\\workerspace\\dat
#缃戝叧鐨剆chema浣嶇疆
-schema.http=http://192.168.44.67:9999/metadata/schema/v1/fields/voip_record_log
+schema.http=http://192.168.44.67:9999/metadata/schema/v1/fields/voip_record
#--------------------------------Kafka娑堣垂缁勪俊鎭------------------------------#
#kafka 鎺ユ敹鏁版嵁topic
-input.kafka.topic=VOIP-RECORD
+source.kafka.topic=test
#琛ュ叏鏁版嵁 杈撳嚭 topic
-output.kafka.topic=VOIP-CONVERSATION-RECORD
+sink.kafka.topic=test-result
#璇诲彇topic,瀛樺偍璇pout id鐨勬秷璐筼ffset淇℃伅锛屽彲閫氳繃璇ユ嫇鎵戝懡鍚;鍏蜂綋瀛樺偍offset鐨勪綅缃紝纭畾涓嬫璇诲彇涓嶉噸澶嶇殑鏁版嵁锛
-group.id=voip-relation-log-20210625-1
+group.id=mytest-1
#鐢熶骇鑰呭帇缂╂ā寮 none or snappy
producer.kafka.compression.type=none
@@ -36,10 +36,10 @@ producer.ack=1
window.parallelism=1
#voip鏃ュ織瀵瑰噯绐楀彛鏃堕棿 seconds
-voip.calibration.window.time=15
+voip.calibration.window.time=30
#鍗曞悜娴佸鍑嗙獥鍙f椂闂 seconds
-one.sided.window.time=60
+one.sided.window.time=5
#voip浜屾瀵瑰噯鏃堕棿 seconds
-sec.combine.sr.cache.secs=300
\ No newline at end of file
+sec.combine.sr.cache.secs=180
\ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/common/VoipRelationConfig.java b/src/main/java/com/zdjizhi/common/VoipRelationConfig.java
index abd41c7..1a2f78e 100644
--- a/src/main/java/com/zdjizhi/common/VoipRelationConfig.java
+++ b/src/main/java/com/zdjizhi/common/VoipRelationConfig.java
@@ -26,16 +26,23 @@ public class VoipRelationConfig {
/**
* connection kafka
*/
- public static final String INPUT_KAFKA_SERVERS = VoipRelationConfigurations.getStringProperty(0, "input.kafka.servers");
- public static final String OUTPUT_KAFKA_SERVERS = VoipRelationConfigurations.getStringProperty(0, "output.kafka.servers");
- public static final String INPUT_KAFKA_TOPIC = VoipRelationConfigurations.getStringProperty(0, "input.kafka.topic");
- public static final String OUTPUT_KAFKA_TOPIC = VoipRelationConfigurations.getStringProperty(0, "output.kafka.topic");
+ public static final String SOURCE_KAFKA_SERVERS = VoipRelationConfigurations.getStringProperty(0, "source.kafka.servers");
+ public static final String SINK_KAFKA_SERVERS = VoipRelationConfigurations.getStringProperty(0, "sink.kafka.servers");
+ public static final String SOURCE_KAFKA_TOPIC = VoipRelationConfigurations.getStringProperty(0, "source.kafka.topic");
+ public static final String SINK_KAFKA_TOPIC = VoipRelationConfigurations.getStringProperty(0, "sink.kafka.topic");
public static final String GROUP_ID = VoipRelationConfigurations.getStringProperty(0, "group.id");
public static final String PRODUCER_ACK = VoipRelationConfigurations.getStringProperty(0, "producer.ack");
public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = VoipRelationConfigurations.getStringProperty(0, "producer.kafka.compression.type");
public static final String KAFKA_USER = VoipRelationConfigurations.getStringProperty(1, "kafka.user");
public static final String KAFKA_PIN = VoipRelationConfigurations.getStringProperty(1, "kafka.pin");
+ /**
+ * kafka source config
+ */
+ public static final String SESSION_TIMEOUT_MS = VoipRelationConfigurations.getStringProperty(1, "session.timeout.ms");
+ public static final String MAX_POLL_RECORDS = VoipRelationConfigurations.getStringProperty(1, "max.poll.records");
+ public static final String MAX_PARTITION_FETCH_BYTES = VoipRelationConfigurations.getStringProperty(1, "max.partition.fetch.bytes");
+
/**
* kafka sink
*/
@@ -58,5 +65,6 @@ public class VoipRelationConfig {
* voip
*/
public static final Integer SEC_COMBINE_SR_CACHE_SECS = VoipRelationConfigurations.getIntProperty(0, "sec.combine.sr.cache.secs");
+ public static final Integer CHECK_INNER_NETWORK = VoipRelationConfigurations.getIntProperty(1, "check.inner.network");
}
\ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/topology/VoIpRelationTopology.java b/src/main/java/com/zdjizhi/topology/VoIpRelationTopology.java
index 37b2f18..80a80ad 100644
--- a/src/main/java/com/zdjizhi/topology/VoIpRelationTopology.java
+++ b/src/main/java/com/zdjizhi/topology/VoIpRelationTopology.java
@@ -4,17 +4,14 @@ import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.VoipRelationConfig;
import com.zdjizhi.utils.functions.*;
-import com.zdjizhi.utils.kafka.Consumer;
-import com.zdjizhi.utils.kafka.Producer;
+import com.zdjizhi.utils.kafka.KafkaConsumer;
+import com.zdjizhi.utils.kafka.KafkaProducer;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
/**
@@ -29,9 +26,7 @@ public class VoIpRelationTopology {
public static void main(String[] args) {
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
-// environment.enableCheckpointing(5000);
-
- DataStream streamSource = environment.addSource(Consumer.getKafkaConsumer());
+ DataStream streamSource = environment.addSource(KafkaConsumer.getKafkaConsumer());
SingleOutputStreamOperator> sipCorrelation =
streamSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(VoipRelationConfig.ONE_SIDED_WINDOW_TIME)))
@@ -39,26 +34,14 @@ public class VoIpRelationTopology {
SingleOutputStreamOperator window = sipCorrelation.windowAll(
TumblingProcessingTimeWindows.of(Time.seconds(VoipRelationConfig.VOIP_CALIBRATION_WINDOW_TIME)))
- .process(new SipCalibrationWindow()).name("SipCalibrationWindow");
+ .process(new SipCalibrationWindowFunction()).name("SipCalibrationWindowFunction");
- window.addSink(Producer.getKafkaProducer()).name("VoIpLogSinkKafka");
-
-// KeyedStream, String> keyedStream = sipCorrelation.keyBy(new KeyByFunction());
-//
-// WindowedStream, String, TimeWindow> window =
-// keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(VoipRelationConfig.VOIP_CALIBRATION_WINDOW_TIME)));
-//
-// SingleOutputStreamOperator output = window.process(new SipCalibrationWindowFunction())
-// .name("SipCalibrationWindow").setParallelism(VoipRelationConfig.WINDOW_PARALLELISM);
-
-// output.addSink(Producer.getKafkaProducer()).name("VoIpLogSinkKafka");
+ window.addSink(KafkaProducer.getKafkaProducer()).name("VoIpLogSinkKafka");
try {
- environment.execute("VOIP-RELATION");
+ environment.execute(args[0]);
} catch (Exception e) {
logger.error("This Flink task start ERROR! Exception information is :" + e);
}
-
}
-
}
diff --git a/src/main/java/com/zdjizhi/utils/functions/KeyByFunction.java b/src/main/java/com/zdjizhi/utils/functions/KeyByFunction.java
deleted file mode 100644
index 0b00b3c..0000000
--- a/src/main/java/com/zdjizhi/utils/functions/KeyByFunction.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package com.zdjizhi.utils.functions;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple3;
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi.utils.functions
- * @Description:
- * @date 2021/7/2112:13
- */
-public class KeyByFunction implements KeySelector, String> {
-
- @Override
- public String getKey(Tuple3 value) throws Exception {
- //浠ap鎷兼帴鐨刱ey鍒嗙粍
- return value.f1;
- }
-}
diff --git a/src/main/java/com/zdjizhi/utils/functions/OneSidedWindowFunction.java b/src/main/java/com/zdjizhi/utils/functions/OneSidedWindowFunction.java
index 38e6bdd..061fa12 100644
--- a/src/main/java/com/zdjizhi/utils/functions/OneSidedWindowFunction.java
+++ b/src/main/java/com/zdjizhi/utils/functions/OneSidedWindowFunction.java
@@ -41,7 +41,6 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction("", "violation", input));
}
}
@@ -364,4 +367,10 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction, String, TimeWindow> {
- private static final Log logger = LogFactory.get();
-
- /**
- * 瀹炰綋绫诲弽灏刴ap
- */
- private static HashMap classMap = JsonParseUtil.getMapFromHttp(VoipRelationConfig.SCHEMA_HTTP);
-
- /**
- * 鍙嶅皠鎴愪竴涓被
- */
- private static Object voipObject = JsonParseUtil.generateObject(classMap);
-
- /**
- * 鍏宠仈鐢℉ashMap
- * key---鍥涘厓缁
- * value---List瀛樻斁瀵瑰簲SIP鎴栬匯TP鏁版嵁
- * 瀛樻斁鏁版嵁:rtp-single,rtp-two,sip-two
- * 涓嶅瓨鏀剧殑鏁版嵁:sip-single涓巗ip-in
- */
- private static HashMap> combineSRHmList = new HashMap<>(16);
-
- /**
- * 浜屾鍏宠仈鐢℉ashMap
- * key---鍥涘厓缁
- * value---List瀛樻斁瀵瑰簲SIP鎴栬匯TP鏁版嵁
- * 瀛樻斁鏁版嵁:rtp-single,rtp-two,sip-two
- * 涓嶅瓨鏀剧殑鏁版嵁:sip-single涓巗ip-in
- */
- private static HashMap> secCombineSRHmList = new HashMap<>(16);
-
- @Override
- public void process(Context context, Iterable> input, Collector output) throws Exception {
- for (Tuple3 tuple : input) {
- //鎷兼帴鐨勫洓鍏冪粍
- String fourKey = tuple.f0;
- //宸插叧鑱旂殑sip,rtp;鏈叧鑱旂殑sip,rtp;鍐呯綉鐨剆ip
- String type = tuple.f1;
- String msg = tuple.f2;
- switch (type) {
- //鍗曞悜娴佸鍑嗗悗鐨凷IP
- case "sip-two":
- //鍗曞悜娴佸鍑嗗悗鐨凴TP
- case "rtp-two":
- //瀵逛笉涓婄殑RTP
- case "rtp-single":
- putKeyAndMsg(msg, fourKey, combineSRHmList);
- break;
- //鍗曞悜娴佺殑SIP
- case "sip-single":
- //鍐呯綉鐨凷IP
- case "sip-in":
- output.collect(msg);
- break;
- default:
- logger.error("type is beyond expectation:" + type);
- break;
-
- }
- }
- //鍒濇鍏宠仈
- tickCombineHmList(combineSRHmList, output);
- //鍜岀紦瀛樹腑鐨勬暟鎹簩娆″叧鑱
- tickCombineHmList(secCombineSRHmList, output);
- }
-
-
- /**
- * 瀹氭椂鍏宠仈,鍖呮嫭鍒濇鍏宠仈浠ュ強鍚庣画浜屾鍏宠仈
- *
- * @param combineHmList
- */
- private void tickCombineHmList(HashMap> combineHmList, Collector output) {
- if (combineHmList.size() > 0) {
- long nowTime = System.currentTimeMillis() / 1000;
-
- HashMap> tempCombineSRhmList = new HashMap<>(combineHmList);
- combineHmList.clear();
-
- for (String fourStrKey : tempCombineSRhmList.keySet()) {
-
- LinkedList tempList = tempCombineSRhmList.get(fourStrKey);
- //鍖呭惈SIP鍜孯TP
- int listSize = tempList.size();
- System.out.println(listSize);
- if (listSize > 1) {
-
- List sipBeanArr = new ArrayList<>();
- List rtpBeanArr = new ArrayList<>();
-
- for (String message : tempList) {
- Object tempSipOrRtpLog = JSONObject.parseObject(message, voipObject.getClass());
- String schemaType = JsonParseUtil.getString(tempSipOrRtpLog, JsonProConfig.SCHEMA_TYPE);
- if (JsonProConfig.SIP_MARK.equals(schemaType)) {
- sipBeanArr.add(message);
- } else if (JsonProConfig.RTP_MARK.equals(schemaType)) {
- rtpBeanArr.add(message);
- }
- }
- int rtpSize = rtpBeanArr.size();
- int sipSize = sipBeanArr.size();
-
- if (rtpSize == 1 && sipSize >= 1) {
- for (String sipMessage : sipBeanArr) {
- Object rtpLog = JSONObject.parseObject(rtpBeanArr.get(0), voipObject.getClass());
- Object voipLog = JSONObject.parseObject(sipMessage, voipObject.getClass());
- accumulateVoipMsg(voipLog, rtpLog);
- JsonParseUtil.setValue(voipLog, JsonProConfig.SCHEMA_TYPE, "VoIP");
- JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_ORIGINATOR_DIR, judgeDirection(rtpLog, voipLog));
- //鍥涘厓缁,voip,鍏宠仈鍚庣殑鏁版嵁
- output.collect(mergeJson(voipLog, rtpLog));
-// basicOutputCollector.emit(new Values(fourStrKey, "voip", mergeJson(voipLog, rtpLog)));
- }
- } else if (sipSize == 1 && rtpSize >= 1) {
- for (String rtpMessage : rtpBeanArr) {
- Object rtpLog = JSONObject.parseObject(rtpMessage, voipObject.getClass());
- Object voipLog = JSONObject.parseObject(sipBeanArr.get(0), voipObject.getClass());
- accumulateVoipMsg(voipLog, rtpLog);
- JsonParseUtil.setValue(voipLog, JsonProConfig.SCHEMA_TYPE, "VoIP");
- JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_ORIGINATOR_DIR, judgeDirection(rtpLog, voipLog));
- //鍥涘厓缁,voip,鍏宠仈鍚庣殑鏁版嵁
- output.collect(mergeJson(voipLog, rtpLog));
-// basicOutputCollector.emit(new Values(fourStrKey, "voip", mergeJson(voipLog, rtpLog)));
- }
- } else {
- logger.warn("RTP-listSize is [" + rtpBeanArr.size() + "];SIP-listSize is [" + sipBeanArr.size() + "] in this tempCombineSRHmList! Not logical");
- sendErrorLogToKafka(sipBeanArr, output);
- sendErrorLogToKafka(rtpBeanArr, output);
- }
-
- } else {
- String msg = tempList.get(0);
-
- Object voipLog = JSONObject.parseObject(msg, voipObject.getClass());
- long commonEndTime = JsonParseUtil.getLong(voipLog, JsonProConfig.END_TIME);
- long intervalTime = nowTime - commonEndTime;
- if (intervalTime <= VoipRelationConfig.SEC_COMBINE_SR_CACHE_SECS) {
- putKeyAndMsg(msg, fourStrKey, secCombineSRHmList);
- } else {
- sendDirectlyOneElement(msg, voipLog, output);
- }
-
- }
- }
- }
- }
-
- /**
- * 绱姞鍏宠仈鍚庣殑瀛楄妭绫诲弬鏁板
- *
- * @param voipLog
- * @param rtpLog
- */
- private void accumulateVoipMsg(Object voipLog, Object rtpLog) {
-
- Long sumCommonSessions = JsonParseUtil.getLong(voipLog, JsonProConfig.SESSIONS)
- + JsonParseUtil.getLong(rtpLog, JsonProConfig.SESSIONS);
-
- Long sumCommonC2sPktNum = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_PKT_NUM)
- + JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_PKT_NUM);
-
- Long sumCommonS2cPktNum = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_PKT_NUM)
- + JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_PKT_NUM);
-
- Long sumCommonC2sByteNum = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_BYTE_NUM)
- + JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_BYTE_NUM);
-
- Long sumCommonS2cByteNum = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_BYTE_NUM)
- + JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_BYTE_NUM);
-
- Long sumCommonC2sIpfragNum = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_IPFRAG_NUM)
- + JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_IPFRAG_NUM);
-
- Long sumCommonS2cIpfragNum = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_IPFRAG_NUM)
- + JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_IPFRAG_NUM);
-
- Long sumCommonC2sTcpLostlen = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_TCP_LOSTLEN)
- + JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_TCP_LOSTLEN);
-
- Long sumCommonS2cTcpLostlen = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_TCP_LOSTLEN)
- + JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_TCP_LOSTLEN);
-
- Long sumCommonC2sTcpUnorderNum = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_TCP_UNORDER_NUM)
- + JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_TCP_UNORDER_NUM);
-
- Long sumCommonS2cTcpUnorderNum = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_TCP_UNORDER_NUM)
- + JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_TCP_UNORDER_NUM);
-
- //common_sessions
- JsonParseUtil.setValue(voipLog, JsonProConfig.SESSIONS, sumCommonSessions);
-
- //common_c2s_pkt_num
- JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_PKT_NUM, sumCommonC2sPktNum);
- //common_s2c_pkt_num
- JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_PKT_NUM, sumCommonS2cPktNum);
- //common_c2s_byte_num
- JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_BYTE_NUM, sumCommonC2sByteNum);
- //common_s2c_byte_num
- JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_BYTE_NUM, sumCommonS2cByteNum);
-
- //common_c2s_ipfrag_num
- JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_IPFRAG_NUM, sumCommonC2sIpfragNum);
- //common_s2c_ipfrag_num
- JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_IPFRAG_NUM, sumCommonS2cIpfragNum);
-
- //common_c2s_tcp_lostlen
- JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_TCP_LOSTLEN, sumCommonC2sTcpLostlen);
- //common_s2c_tcp_lostlen
- JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_TCP_LOSTLEN, sumCommonS2cTcpLostlen);
-
- //common_c2s_tcp_unorder_num
- JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_TCP_UNORDER_NUM, sumCommonC2sTcpUnorderNum);
- //common_s2c_tcp_unorder_num
- JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_TCP_UNORDER_NUM, sumCommonS2cTcpUnorderNum);
-
- }
-
- /**
- * 瀹氭椂澶勭悊涓璍ist鍏冪礌鏁颁粎涓1鐨勬儏鍐
- */
- private void sendDirectlyOneElement(String msg, Object voipLog, Collector output) {
- //鍥涘厓缁,sip(涓瀹氫负鍙屼晶)/rtp(鍙兘涓哄崟渚т篃鍙兘涓哄弻渚,鐪嬪崟鍚戞祦瀛楁淇℃伅),鎷垮嚭鏉ョ殑鍘熷鏁版嵁
- String commonSchemaType = JsonParseUtil.getString(voipLog, JsonProConfig.SCHEMA_TYPE);
- if (JsonProConfig.SIP_MARK.equals(commonSchemaType)) {
- output.collect(msg);
- } else if (JsonProConfig.RTP_MARK.equals(commonSchemaType)) {
- int commonStreamDir = JsonParseUtil.getInteger(voipLog, JsonProConfig.STREAM_DIR);
- if (commonStreamDir != JsonProConfig.DOUBLE) {
- output.collect(msg);
- } else {
- output.collect(msg);
- }
- }
- }
-
-
- /**
- * 瀛樻斁key骞舵坊鍔犲搴擫ist
- */
- private static void putKeyAndMsg(String message, String fourStrKey, HashMap> combineSRHmList) {
- if (combineSRHmList.containsKey(fourStrKey)) {
- LinkedList tmpList = combineSRHmList.get(fourStrKey);
- tmpList.add(message);
- combineSRHmList.put(fourStrKey, tmpList);
- } else {
- LinkedList tmpList = new LinkedList<>();
- tmpList.add(message);
- combineSRHmList.put(fourStrKey, tmpList);
- }
- }
-
- /**
- * 鍒ゆ柇RTP涓诲彨鏂瑰悜-娴嬭瘯
- *
- * @param rtpLog RTP鍘熷鏃ュ織
- * @param voipLog 铻嶅悎鍚嶸OIP鏃ュ織
- * @return 鏂瑰悜 0锛氭湭鐭 1锛歝2s 2锛歴2c
- */
- private static int judgeDirection(Object rtpLog, Object voipLog) {
-
- String ip = JsonParseUtil.getString(rtpLog, JsonProConfig.CLIENT_IP);
- String sipOriginatorIp = JsonParseUtil.getString(voipLog, JsonProConfig.SIP_ORIGINATOR_IP);
- String sipResponderIp = JsonParseUtil.getString(voipLog, JsonProConfig.SIP_RESPONDER_IP);
-
- if (StringUtil.isNotBlank(ip) && ip.equals(sipOriginatorIp)) {
- return 1;
- } else if (StringUtil.isNotBlank(ip) && ip.equals(sipResponderIp)) {
- return 2;
- }
-
- return 0;
- }
-
- /**
- * 鍙戦佷笉绗﹀悎閫昏緫鐨勬棩蹇楀埌kafka
- */
- private static void sendErrorLogToKafka(List logList, Collector output) {
- if (logList.size() > 0) {
- for (String log : logList) {
- output.collect(log);
- }
- }
- }
-
- /**
- *
- */
- private static String mergeJson(Object voipLog, Object rtpLog) {
-
- int rtpPayloadTypeC2s = JsonParseUtil.getInteger(rtpLog, JsonProConfig.RTP_PAYLOAD_TYPE_C2S);
- int rtpPayloadTypeS2c = JsonParseUtil.getInteger(rtpLog, JsonProConfig.RTP_PAYLOAD_TYPE_S2C);
- String rtpPcapPath = JsonParseUtil.getString(rtpLog, JsonProConfig.RTP_PCAP_PATH);
-
- JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_PAYLOAD_TYPE_C2S, rtpPayloadTypeC2s);
- JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_PAYLOAD_TYPE_S2C, rtpPayloadTypeS2c);
- JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_PCAP_PATH, rtpPcapPath);
-
- return JSONObject.toJSONString(voipLog);
- }
-
-}
diff --git a/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindowFunction.java b/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindowFunction.java
index 2215d5b..3718775 100644
--- a/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindowFunction.java
+++ b/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindowFunction.java
@@ -8,11 +8,14 @@ import com.zdjizhi.common.VoipRelationConfig;
import com.zdjizhi.utils.StringUtil;
import com.zdjizhi.utils.json.JsonParseUtil;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
/**
* @author qidaijie
@@ -20,7 +23,7 @@ import java.util.*;
* @Description:
* @date 2021/7/2113:55
*/
-public class SipCalibrationWindowFunction extends ProcessWindowFunction, String, String, TimeWindow> {
+public class SipCalibrationWindowFunction extends ProcessAllWindowFunction, String, TimeWindow> {
private static final Log logger = LogFactory.get();
/**
@@ -52,8 +55,8 @@ public class SipCalibrationWindowFunction extends ProcessWindowFunction> secCombineSRHmList = new HashMap<>(16);
@Override
- @SuppressWarnings("unchecked")
- public void process(String key, Context context, Iterable> input, Collector output) throws Exception {
+ public void process(Context context, Iterable> input, Collector output) throws Exception {
+ logger.error("windowall绐楀彛杩愯");
for (Tuple3 tuple : input) {
//鎷兼帴鐨勫洓鍏冪粍
String fourKey = tuple.f0;
@@ -63,16 +66,17 @@ public class SipCalibrationWindowFunction extends ProcessWindowFunction tempList = tempCombineSRhmList.get(fourStrKey);
//鍖呭惈SIP鍜孯TP
int listSize = tempList.size();
- System.out.println(listSize);
if (listSize > 1) {
List sipBeanArr = new ArrayList<>();
@@ -156,12 +160,12 @@ public class SipCalibrationWindowFunction extends ProcessWindowFunction getKafkaConsumer() {
- FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(VoipRelationConfig.INPUT_KAFKA_TOPIC,
+ FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(VoipRelationConfig.SOURCE_KAFKA_TOPIC,
new SimpleStringSchema(), createConsumerConfig());
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
diff --git a/src/main/java/com/zdjizhi/utils/kafka/Producer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
similarity index 87%
rename from src/main/java/com/zdjizhi/utils/kafka/Producer.java
rename to src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
index bf63098..65e253a 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/Producer.java
+++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
@@ -13,11 +13,11 @@ import java.util.Properties;
* @Description:
* @date 2021/6/814:04
*/
-public class Producer {
+public class KafkaProducer {
private static Properties createProducerConfig() {
Properties properties = new Properties();
- properties.put("bootstrap.servers", VoipRelationConfig.OUTPUT_KAFKA_SERVERS);
+ properties.put("bootstrap.servers", VoipRelationConfig.SINK_KAFKA_SERVERS);
properties.put("acks", VoipRelationConfig.PRODUCER_ACK);
properties.put("retries", VoipRelationConfig.RETRIES);
properties.put("linger.ms", VoipRelationConfig.LINGER_MS);
@@ -27,7 +27,7 @@ public class Producer {
properties.put("max.request.size", VoipRelationConfig.MAX_REQUEST_SIZE);
properties.put("compression.type", VoipRelationConfig.PRODUCER_KAFKA_COMPRESSION_TYPE);
- CertUtils.chooseCert(VoipRelationConfig.KAFKA_SINK_PROTOCOL, properties);
+ CertUtils.chooseCert(VoipRelationConfig.SINK_KAFKA_SERVERS, properties);
return properties;
}
@@ -35,7 +35,7 @@ public class Producer {
public static FlinkKafkaProducer getKafkaProducer() {
FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer(
- VoipRelationConfig.OUTPUT_KAFKA_TOPIC,
+ VoipRelationConfig.SINK_KAFKA_TOPIC,
new SimpleStringSchema(),
createProducerConfig(), Optional.empty());