From 00cfc1a1138b2cc8bf05c3797a062d053d52c679 Mon Sep 17 00:00:00 2001 From: qidaijie Date: Wed, 9 Mar 2022 10:14:59 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96Kafka=E8=AE=A4=E8=AF=81?= =?UTF-8?q?=E6=96=B9=E5=BC=8F=EF=BC=8C=E5=88=A0=E9=99=A4=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=E9=A1=B9=E9=80=9A=E8=BF=87=E8=BF=9E=E6=8E=A5=E7=AB=AF=E5=8F=A3?= =?UTF-8?q?=E5=88=A4=E6=96=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 4 +- properties/default_config.properties | 44 ++- properties/service_flow_config.properties | 20 +- .../zdjizhi/common/VoipRelationConfig.java | 16 +- .../topology/VoIpRelationTopology.java | 29 +- .../utils/functions/KeyByFunction.java | 19 - .../functions/OneSidedWindowFunction.java | 17 +- .../utils/functions/SipCalibrationWindow.java | 327 ------------------ .../SipCalibrationWindowFunction.java | 29 +- .../java/com/zdjizhi/utils/ip/IPUtils.java | 23 +- .../com/zdjizhi/utils/kafka/CertUtils.java | 48 ++- .../{Consumer.java => KafkaConsumer.java} | 14 +- .../{Producer.java => KafkaProducer.java} | 8 +- 13 files changed, 142 insertions(+), 456 deletions(-) delete mode 100644 src/main/java/com/zdjizhi/utils/functions/KeyByFunction.java delete mode 100644 src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindow.java rename src/main/java/com/zdjizhi/utils/kafka/{Consumer.java => KafkaConsumer.java} (69%) rename src/main/java/com/zdjizhi/utils/kafka/{Producer.java => KafkaProducer.java} (87%) diff --git a/pom.xml b/pom.xml index 1491d5e..64defb4 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.zdjizhi log-stream-voip-relation - 210908-security + 220309-inner log-stream-voip-relation http://www.example.com @@ -122,7 +122,7 @@ com.zdjizhi galaxy - 1.0.6 + 1.0.8 slf4j-log4j12 diff --git a/properties/default_config.properties b/properties/default_config.properties index 2b9bfb1..772658e 100644 --- a/properties/default_config.properties +++ b/properties/default_config.properties @@ -1,29 +1,41 @@ -#producer重试的次数设置 +#====================Kafka KafkaConsumer====================# +#kafka source connection timeout +session.timeout.ms=60000 + +#kafka source poll +max.poll.records=3000 + +#kafka source poll bytes +max.partition.fetch.bytes=31457280 +#====================Kafka KafkaProducer====================# +#producer閲嶈瘯鐨勬鏁拌缃 retries=0 -#他的含义就是说一个Batch被创建之后,最多过多久,不管这个Batch有没有写满,都必须发送出去了 -linger.ms=5 +#浠栫殑鍚箟灏辨槸璇翠竴涓狟atch琚垱寤轰箣鍚庯紝鏈澶氳繃澶氫箙锛屼笉绠¤繖涓狟atch鏈夋病鏈夊啓婊★紝閮藉繀椤诲彂閫佸嚭鍘讳簡 +linger.ms=10 -#如果在超时之前未收到响应,客户端将在必要时重新发送请求 +#濡傛灉鍦ㄨ秴鏃朵箣鍓嶆湭鏀跺埌鍝嶅簲锛屽鎴风灏嗗湪蹇呰鏃堕噸鏂板彂閫佽姹 request.timeout.ms=30000 -#producer都是按照batch进行发送的,批次大小,默认:16384 +#producer閮芥槸鎸夌収batch杩涜鍙戦佺殑,鎵规澶у皬锛岄粯璁:16384 batch.size=262144 -#Producer端用于缓存消息的缓冲区大小 -buffer.memory=67108864 +#Producer绔敤浜庣紦瀛樻秷鎭殑缂撳啿鍖哄ぇ灏 +#128M +buffer.memory=134217728 -#这个参数决定了每次发送给Kafka服务器请求的最大大小,默认1048576 -max.request.size=5242880 - -#kafka SASL验证用户名 +#杩欎釜鍙傛暟鍐冲畾浜嗘瘡娆″彂閫佺粰Kafka鏈嶅姟鍣ㄨ姹傜殑鏈澶уぇ灏,榛樿1048576 +#10M +max.request.size=10485760 +#====================kafka default====================# +#kafka SASL楠岃瘉鐢ㄦ埛鍚 kafka.user=admin -#kafka SASL及SSL验证密码 +#kafka SASL鍙奡SL楠岃瘉瀵嗙爜 kafka.pin=galaxy2019 -#kafka source protocol; SSL or SASL -kafka.source.protocol=SASL +#====================Topology Default====================# +#check ip is Inner network;0 off, 1 on. +check.inner.network=1 + -#kafka sink protocol; SSL or SASL -kafka.sink.protocol=SASL \ No newline at end of file diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 939b6d8..2b49df7 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,28 +1,28 @@ #--------------------------------鍦板潃閰嶇疆------------------------------# #绠$悊kafka鍦板潃 -input.kafka.servers=192.168.44.12:9092 +source.kafka.servers=192.168.44.12:9094 #绠$悊杈撳嚭kafka鍦板潃 -output.kafka.servers=192.168.44.12:9092 +sink.kafka.servers=192.168.44.12:9094 #--------------------------------HTTP------------------------------# #瀹氫綅搴撳湴鍧 -tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\ +tools.library=D:\\workerspace\\dat #缃戝叧鐨剆chema浣嶇疆 -schema.http=http://192.168.44.67:9999/metadata/schema/v1/fields/voip_record_log +schema.http=http://192.168.44.67:9999/metadata/schema/v1/fields/voip_record #--------------------------------Kafka娑堣垂缁勪俊鎭------------------------------# #kafka 鎺ユ敹鏁版嵁topic -input.kafka.topic=VOIP-RECORD +source.kafka.topic=test #琛ュ叏鏁版嵁 杈撳嚭 topic -output.kafka.topic=VOIP-CONVERSATION-RECORD +sink.kafka.topic=test-result #璇诲彇topic,瀛樺偍璇pout id鐨勬秷璐筼ffset淇℃伅锛屽彲閫氳繃璇ユ嫇鎵戝懡鍚;鍏蜂綋瀛樺偍offset鐨勪綅缃紝纭畾涓嬫璇诲彇涓嶉噸澶嶇殑鏁版嵁锛 -group.id=voip-relation-log-20210625-1 +group.id=mytest-1 #鐢熶骇鑰呭帇缂╂ā寮 none or snappy producer.kafka.compression.type=none @@ -36,10 +36,10 @@ producer.ack=1 window.parallelism=1 #voip鏃ュ織瀵瑰噯绐楀彛鏃堕棿 seconds -voip.calibration.window.time=15 +voip.calibration.window.time=30 #鍗曞悜娴佸鍑嗙獥鍙f椂闂 seconds -one.sided.window.time=60 +one.sided.window.time=5 #voip浜屾瀵瑰噯鏃堕棿 seconds -sec.combine.sr.cache.secs=300 \ No newline at end of file +sec.combine.sr.cache.secs=180 \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/common/VoipRelationConfig.java b/src/main/java/com/zdjizhi/common/VoipRelationConfig.java index abd41c7..1a2f78e 100644 --- a/src/main/java/com/zdjizhi/common/VoipRelationConfig.java +++ b/src/main/java/com/zdjizhi/common/VoipRelationConfig.java @@ -26,16 +26,23 @@ public class VoipRelationConfig { /** * connection kafka */ - public static final String INPUT_KAFKA_SERVERS = VoipRelationConfigurations.getStringProperty(0, "input.kafka.servers"); - public static final String OUTPUT_KAFKA_SERVERS = VoipRelationConfigurations.getStringProperty(0, "output.kafka.servers"); - public static final String INPUT_KAFKA_TOPIC = VoipRelationConfigurations.getStringProperty(0, "input.kafka.topic"); - public static final String OUTPUT_KAFKA_TOPIC = VoipRelationConfigurations.getStringProperty(0, "output.kafka.topic"); + public static final String SOURCE_KAFKA_SERVERS = VoipRelationConfigurations.getStringProperty(0, "source.kafka.servers"); + public static final String SINK_KAFKA_SERVERS = VoipRelationConfigurations.getStringProperty(0, "sink.kafka.servers"); + public static final String SOURCE_KAFKA_TOPIC = VoipRelationConfigurations.getStringProperty(0, "source.kafka.topic"); + public static final String SINK_KAFKA_TOPIC = VoipRelationConfigurations.getStringProperty(0, "sink.kafka.topic"); public static final String GROUP_ID = VoipRelationConfigurations.getStringProperty(0, "group.id"); public static final String PRODUCER_ACK = VoipRelationConfigurations.getStringProperty(0, "producer.ack"); public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = VoipRelationConfigurations.getStringProperty(0, "producer.kafka.compression.type"); public static final String KAFKA_USER = VoipRelationConfigurations.getStringProperty(1, "kafka.user"); public static final String KAFKA_PIN = VoipRelationConfigurations.getStringProperty(1, "kafka.pin"); + /** + * kafka source config + */ + public static final String SESSION_TIMEOUT_MS = VoipRelationConfigurations.getStringProperty(1, "session.timeout.ms"); + public static final String MAX_POLL_RECORDS = VoipRelationConfigurations.getStringProperty(1, "max.poll.records"); + public static final String MAX_PARTITION_FETCH_BYTES = VoipRelationConfigurations.getStringProperty(1, "max.partition.fetch.bytes"); + /** * kafka sink */ @@ -58,5 +65,6 @@ public class VoipRelationConfig { * voip */ public static final Integer SEC_COMBINE_SR_CACHE_SECS = VoipRelationConfigurations.getIntProperty(0, "sec.combine.sr.cache.secs"); + public static final Integer CHECK_INNER_NETWORK = VoipRelationConfigurations.getIntProperty(1, "check.inner.network"); } \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/topology/VoIpRelationTopology.java b/src/main/java/com/zdjizhi/topology/VoIpRelationTopology.java index 37b2f18..80a80ad 100644 --- a/src/main/java/com/zdjizhi/topology/VoIpRelationTopology.java +++ b/src/main/java/com/zdjizhi/topology/VoIpRelationTopology.java @@ -4,17 +4,14 @@ import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.zdjizhi.common.VoipRelationConfig; import com.zdjizhi.utils.functions.*; -import com.zdjizhi.utils.kafka.Consumer; -import com.zdjizhi.utils.kafka.Producer; +import com.zdjizhi.utils.kafka.KafkaConsumer; +import com.zdjizhi.utils.kafka.KafkaProducer; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; -import org.apache.flink.streaming.api.windowing.windows.TimeWindow; /** @@ -29,9 +26,7 @@ public class VoIpRelationTopology { public static void main(String[] args) { final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); -// environment.enableCheckpointing(5000); - - DataStream streamSource = environment.addSource(Consumer.getKafkaConsumer()); + DataStream streamSource = environment.addSource(KafkaConsumer.getKafkaConsumer()); SingleOutputStreamOperator> sipCorrelation = streamSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(VoipRelationConfig.ONE_SIDED_WINDOW_TIME))) @@ -39,26 +34,14 @@ public class VoIpRelationTopology { SingleOutputStreamOperator window = sipCorrelation.windowAll( TumblingProcessingTimeWindows.of(Time.seconds(VoipRelationConfig.VOIP_CALIBRATION_WINDOW_TIME))) - .process(new SipCalibrationWindow()).name("SipCalibrationWindow"); + .process(new SipCalibrationWindowFunction()).name("SipCalibrationWindowFunction"); - window.addSink(Producer.getKafkaProducer()).name("VoIpLogSinkKafka"); - -// KeyedStream, String> keyedStream = sipCorrelation.keyBy(new KeyByFunction()); -// -// WindowedStream, String, TimeWindow> window = -// keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(VoipRelationConfig.VOIP_CALIBRATION_WINDOW_TIME))); -// -// SingleOutputStreamOperator output = window.process(new SipCalibrationWindowFunction()) -// .name("SipCalibrationWindow").setParallelism(VoipRelationConfig.WINDOW_PARALLELISM); - -// output.addSink(Producer.getKafkaProducer()).name("VoIpLogSinkKafka"); + window.addSink(KafkaProducer.getKafkaProducer()).name("VoIpLogSinkKafka"); try { - environment.execute("VOIP-RELATION"); + environment.execute(args[0]); } catch (Exception e) { logger.error("This Flink task start ERROR! Exception information is :" + e); } - } - } diff --git a/src/main/java/com/zdjizhi/utils/functions/KeyByFunction.java b/src/main/java/com/zdjizhi/utils/functions/KeyByFunction.java deleted file mode 100644 index 0b00b3c..0000000 --- a/src/main/java/com/zdjizhi/utils/functions/KeyByFunction.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.zdjizhi.utils.functions; - -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple3; - -/** - * @author qidaijie - * @Package com.zdjizhi.utils.functions - * @Description: - * @date 2021/7/2112:13 - */ -public class KeyByFunction implements KeySelector, String> { - - @Override - public String getKey(Tuple3 value) throws Exception { - //浠ap鎷兼帴鐨刱ey鍒嗙粍 - return value.f1; - } -} diff --git a/src/main/java/com/zdjizhi/utils/functions/OneSidedWindowFunction.java b/src/main/java/com/zdjizhi/utils/functions/OneSidedWindowFunction.java index 38e6bdd..061fa12 100644 --- a/src/main/java/com/zdjizhi/utils/functions/OneSidedWindowFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/OneSidedWindowFunction.java @@ -41,7 +41,6 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction("", "violation", input)); } } @@ -364,4 +367,10 @@ public class OneSidedWindowFunction extends ProcessAllWindowFunction, String, TimeWindow> { - private static final Log logger = LogFactory.get(); - - /** - * 瀹炰綋绫诲弽灏刴ap - */ - private static HashMap classMap = JsonParseUtil.getMapFromHttp(VoipRelationConfig.SCHEMA_HTTP); - - /** - * 鍙嶅皠鎴愪竴涓被 - */ - private static Object voipObject = JsonParseUtil.generateObject(classMap); - - /** - * 鍏宠仈鐢℉ashMap - * key---鍥涘厓缁 - * value---List瀛樻斁瀵瑰簲SIP鎴栬匯TP鏁版嵁 - * 瀛樻斁鏁版嵁:rtp-single,rtp-two,sip-two - * 涓嶅瓨鏀剧殑鏁版嵁:sip-single涓巗ip-in - */ - private static HashMap> combineSRHmList = new HashMap<>(16); - - /** - * 浜屾鍏宠仈鐢℉ashMap - * key---鍥涘厓缁 - * value---List瀛樻斁瀵瑰簲SIP鎴栬匯TP鏁版嵁 - * 瀛樻斁鏁版嵁:rtp-single,rtp-two,sip-two - * 涓嶅瓨鏀剧殑鏁版嵁:sip-single涓巗ip-in - */ - private static HashMap> secCombineSRHmList = new HashMap<>(16); - - @Override - public void process(Context context, Iterable> input, Collector output) throws Exception { - for (Tuple3 tuple : input) { - //鎷兼帴鐨勫洓鍏冪粍 - String fourKey = tuple.f0; - //宸插叧鑱旂殑sip,rtp;鏈叧鑱旂殑sip,rtp;鍐呯綉鐨剆ip - String type = tuple.f1; - String msg = tuple.f2; - switch (type) { - //鍗曞悜娴佸鍑嗗悗鐨凷IP - case "sip-two": - //鍗曞悜娴佸鍑嗗悗鐨凴TP - case "rtp-two": - //瀵逛笉涓婄殑RTP - case "rtp-single": - putKeyAndMsg(msg, fourKey, combineSRHmList); - break; - //鍗曞悜娴佺殑SIP - case "sip-single": - //鍐呯綉鐨凷IP - case "sip-in": - output.collect(msg); - break; - default: - logger.error("type is beyond expectation:" + type); - break; - - } - } - //鍒濇鍏宠仈 - tickCombineHmList(combineSRHmList, output); - //鍜岀紦瀛樹腑鐨勬暟鎹簩娆″叧鑱 - tickCombineHmList(secCombineSRHmList, output); - } - - - /** - * 瀹氭椂鍏宠仈,鍖呮嫭鍒濇鍏宠仈浠ュ強鍚庣画浜屾鍏宠仈 - * - * @param combineHmList - */ - private void tickCombineHmList(HashMap> combineHmList, Collector output) { - if (combineHmList.size() > 0) { - long nowTime = System.currentTimeMillis() / 1000; - - HashMap> tempCombineSRhmList = new HashMap<>(combineHmList); - combineHmList.clear(); - - for (String fourStrKey : tempCombineSRhmList.keySet()) { - - LinkedList tempList = tempCombineSRhmList.get(fourStrKey); - //鍖呭惈SIP鍜孯TP - int listSize = tempList.size(); - System.out.println(listSize); - if (listSize > 1) { - - List sipBeanArr = new ArrayList<>(); - List rtpBeanArr = new ArrayList<>(); - - for (String message : tempList) { - Object tempSipOrRtpLog = JSONObject.parseObject(message, voipObject.getClass()); - String schemaType = JsonParseUtil.getString(tempSipOrRtpLog, JsonProConfig.SCHEMA_TYPE); - if (JsonProConfig.SIP_MARK.equals(schemaType)) { - sipBeanArr.add(message); - } else if (JsonProConfig.RTP_MARK.equals(schemaType)) { - rtpBeanArr.add(message); - } - } - int rtpSize = rtpBeanArr.size(); - int sipSize = sipBeanArr.size(); - - if (rtpSize == 1 && sipSize >= 1) { - for (String sipMessage : sipBeanArr) { - Object rtpLog = JSONObject.parseObject(rtpBeanArr.get(0), voipObject.getClass()); - Object voipLog = JSONObject.parseObject(sipMessage, voipObject.getClass()); - accumulateVoipMsg(voipLog, rtpLog); - JsonParseUtil.setValue(voipLog, JsonProConfig.SCHEMA_TYPE, "VoIP"); - JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_ORIGINATOR_DIR, judgeDirection(rtpLog, voipLog)); - //鍥涘厓缁,voip,鍏宠仈鍚庣殑鏁版嵁 - output.collect(mergeJson(voipLog, rtpLog)); -// basicOutputCollector.emit(new Values(fourStrKey, "voip", mergeJson(voipLog, rtpLog))); - } - } else if (sipSize == 1 && rtpSize >= 1) { - for (String rtpMessage : rtpBeanArr) { - Object rtpLog = JSONObject.parseObject(rtpMessage, voipObject.getClass()); - Object voipLog = JSONObject.parseObject(sipBeanArr.get(0), voipObject.getClass()); - accumulateVoipMsg(voipLog, rtpLog); - JsonParseUtil.setValue(voipLog, JsonProConfig.SCHEMA_TYPE, "VoIP"); - JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_ORIGINATOR_DIR, judgeDirection(rtpLog, voipLog)); - //鍥涘厓缁,voip,鍏宠仈鍚庣殑鏁版嵁 - output.collect(mergeJson(voipLog, rtpLog)); -// basicOutputCollector.emit(new Values(fourStrKey, "voip", mergeJson(voipLog, rtpLog))); - } - } else { - logger.warn("RTP-listSize is [" + rtpBeanArr.size() + "];SIP-listSize is [" + sipBeanArr.size() + "] in this tempCombineSRHmList! Not logical"); - sendErrorLogToKafka(sipBeanArr, output); - sendErrorLogToKafka(rtpBeanArr, output); - } - - } else { - String msg = tempList.get(0); - - Object voipLog = JSONObject.parseObject(msg, voipObject.getClass()); - long commonEndTime = JsonParseUtil.getLong(voipLog, JsonProConfig.END_TIME); - long intervalTime = nowTime - commonEndTime; - if (intervalTime <= VoipRelationConfig.SEC_COMBINE_SR_CACHE_SECS) { - putKeyAndMsg(msg, fourStrKey, secCombineSRHmList); - } else { - sendDirectlyOneElement(msg, voipLog, output); - } - - } - } - } - } - - /** - * 绱姞鍏宠仈鍚庣殑瀛楄妭绫诲弬鏁板 - * - * @param voipLog - * @param rtpLog - */ - private void accumulateVoipMsg(Object voipLog, Object rtpLog) { - - Long sumCommonSessions = JsonParseUtil.getLong(voipLog, JsonProConfig.SESSIONS) - + JsonParseUtil.getLong(rtpLog, JsonProConfig.SESSIONS); - - Long sumCommonC2sPktNum = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_PKT_NUM) - + JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_PKT_NUM); - - Long sumCommonS2cPktNum = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_PKT_NUM) - + JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_PKT_NUM); - - Long sumCommonC2sByteNum = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_BYTE_NUM) - + JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_BYTE_NUM); - - Long sumCommonS2cByteNum = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_BYTE_NUM) - + JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_BYTE_NUM); - - Long sumCommonC2sIpfragNum = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_IPFRAG_NUM) - + JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_IPFRAG_NUM); - - Long sumCommonS2cIpfragNum = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_IPFRAG_NUM) - + JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_IPFRAG_NUM); - - Long sumCommonC2sTcpLostlen = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_TCP_LOSTLEN) - + JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_TCP_LOSTLEN); - - Long sumCommonS2cTcpLostlen = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_TCP_LOSTLEN) - + JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_TCP_LOSTLEN); - - Long sumCommonC2sTcpUnorderNum = JsonParseUtil.getLong(voipLog, JsonProConfig.C2S_TCP_UNORDER_NUM) - + JsonParseUtil.getLong(rtpLog, JsonProConfig.C2S_TCP_UNORDER_NUM); - - Long sumCommonS2cTcpUnorderNum = JsonParseUtil.getLong(voipLog, JsonProConfig.S2C_TCP_UNORDER_NUM) - + JsonParseUtil.getLong(rtpLog, JsonProConfig.S2C_TCP_UNORDER_NUM); - - //common_sessions - JsonParseUtil.setValue(voipLog, JsonProConfig.SESSIONS, sumCommonSessions); - - //common_c2s_pkt_num - JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_PKT_NUM, sumCommonC2sPktNum); - //common_s2c_pkt_num - JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_PKT_NUM, sumCommonS2cPktNum); - //common_c2s_byte_num - JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_BYTE_NUM, sumCommonC2sByteNum); - //common_s2c_byte_num - JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_BYTE_NUM, sumCommonS2cByteNum); - - //common_c2s_ipfrag_num - JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_IPFRAG_NUM, sumCommonC2sIpfragNum); - //common_s2c_ipfrag_num - JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_IPFRAG_NUM, sumCommonS2cIpfragNum); - - //common_c2s_tcp_lostlen - JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_TCP_LOSTLEN, sumCommonC2sTcpLostlen); - //common_s2c_tcp_lostlen - JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_TCP_LOSTLEN, sumCommonS2cTcpLostlen); - - //common_c2s_tcp_unorder_num - JsonParseUtil.setValue(voipLog, JsonProConfig.C2S_TCP_UNORDER_NUM, sumCommonC2sTcpUnorderNum); - //common_s2c_tcp_unorder_num - JsonParseUtil.setValue(voipLog, JsonProConfig.S2C_TCP_UNORDER_NUM, sumCommonS2cTcpUnorderNum); - - } - - /** - * 瀹氭椂澶勭悊涓璍ist鍏冪礌鏁颁粎涓1鐨勬儏鍐 - */ - private void sendDirectlyOneElement(String msg, Object voipLog, Collector output) { - //鍥涘厓缁,sip(涓瀹氫负鍙屼晶)/rtp(鍙兘涓哄崟渚т篃鍙兘涓哄弻渚,鐪嬪崟鍚戞祦瀛楁淇℃伅),鎷垮嚭鏉ョ殑鍘熷鏁版嵁 - String commonSchemaType = JsonParseUtil.getString(voipLog, JsonProConfig.SCHEMA_TYPE); - if (JsonProConfig.SIP_MARK.equals(commonSchemaType)) { - output.collect(msg); - } else if (JsonProConfig.RTP_MARK.equals(commonSchemaType)) { - int commonStreamDir = JsonParseUtil.getInteger(voipLog, JsonProConfig.STREAM_DIR); - if (commonStreamDir != JsonProConfig.DOUBLE) { - output.collect(msg); - } else { - output.collect(msg); - } - } - } - - - /** - * 瀛樻斁key骞舵坊鍔犲搴擫ist - */ - private static void putKeyAndMsg(String message, String fourStrKey, HashMap> combineSRHmList) { - if (combineSRHmList.containsKey(fourStrKey)) { - LinkedList tmpList = combineSRHmList.get(fourStrKey); - tmpList.add(message); - combineSRHmList.put(fourStrKey, tmpList); - } else { - LinkedList tmpList = new LinkedList<>(); - tmpList.add(message); - combineSRHmList.put(fourStrKey, tmpList); - } - } - - /** - * 鍒ゆ柇RTP涓诲彨鏂瑰悜-娴嬭瘯 - * - * @param rtpLog RTP鍘熷鏃ュ織 - * @param voipLog 铻嶅悎鍚嶸OIP鏃ュ織 - * @return 鏂瑰悜 0锛氭湭鐭 1锛歝2s 2锛歴2c - */ - private static int judgeDirection(Object rtpLog, Object voipLog) { - - String ip = JsonParseUtil.getString(rtpLog, JsonProConfig.CLIENT_IP); - String sipOriginatorIp = JsonParseUtil.getString(voipLog, JsonProConfig.SIP_ORIGINATOR_IP); - String sipResponderIp = JsonParseUtil.getString(voipLog, JsonProConfig.SIP_RESPONDER_IP); - - if (StringUtil.isNotBlank(ip) && ip.equals(sipOriginatorIp)) { - return 1; - } else if (StringUtil.isNotBlank(ip) && ip.equals(sipResponderIp)) { - return 2; - } - - return 0; - } - - /** - * 鍙戦佷笉绗﹀悎閫昏緫鐨勬棩蹇楀埌kafka - */ - private static void sendErrorLogToKafka(List logList, Collector output) { - if (logList.size() > 0) { - for (String log : logList) { - output.collect(log); - } - } - } - - /** - * - */ - private static String mergeJson(Object voipLog, Object rtpLog) { - - int rtpPayloadTypeC2s = JsonParseUtil.getInteger(rtpLog, JsonProConfig.RTP_PAYLOAD_TYPE_C2S); - int rtpPayloadTypeS2c = JsonParseUtil.getInteger(rtpLog, JsonProConfig.RTP_PAYLOAD_TYPE_S2C); - String rtpPcapPath = JsonParseUtil.getString(rtpLog, JsonProConfig.RTP_PCAP_PATH); - - JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_PAYLOAD_TYPE_C2S, rtpPayloadTypeC2s); - JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_PAYLOAD_TYPE_S2C, rtpPayloadTypeS2c); - JsonParseUtil.setValue(voipLog, JsonProConfig.RTP_PCAP_PATH, rtpPcapPath); - - return JSONObject.toJSONString(voipLog); - } - -} diff --git a/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindowFunction.java b/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindowFunction.java index 2215d5b..3718775 100644 --- a/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindowFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/SipCalibrationWindowFunction.java @@ -8,11 +8,14 @@ import com.zdjizhi.common.VoipRelationConfig; import com.zdjizhi.utils.StringUtil; import com.zdjizhi.utils.json.JsonParseUtil; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; /** * @author qidaijie @@ -20,7 +23,7 @@ import java.util.*; * @Description: * @date 2021/7/2113:55 */ -public class SipCalibrationWindowFunction extends ProcessWindowFunction, String, String, TimeWindow> { +public class SipCalibrationWindowFunction extends ProcessAllWindowFunction, String, TimeWindow> { private static final Log logger = LogFactory.get(); /** @@ -52,8 +55,8 @@ public class SipCalibrationWindowFunction extends ProcessWindowFunction> secCombineSRHmList = new HashMap<>(16); @Override - @SuppressWarnings("unchecked") - public void process(String key, Context context, Iterable> input, Collector output) throws Exception { + public void process(Context context, Iterable> input, Collector output) throws Exception { + logger.error("windowall绐楀彛杩愯"); for (Tuple3 tuple : input) { //鎷兼帴鐨勫洓鍏冪粍 String fourKey = tuple.f0; @@ -63,16 +66,17 @@ public class SipCalibrationWindowFunction extends ProcessWindowFunction tempList = tempCombineSRhmList.get(fourStrKey); //鍖呭惈SIP鍜孯TP int listSize = tempList.size(); - System.out.println(listSize); if (listSize > 1) { List sipBeanArr = new ArrayList<>(); @@ -156,12 +160,12 @@ public class SipCalibrationWindowFunction extends ProcessWindowFunction getKafkaConsumer() { - FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(VoipRelationConfig.INPUT_KAFKA_TOPIC, + FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(VoipRelationConfig.SOURCE_KAFKA_TOPIC, new SimpleStringSchema(), createConsumerConfig()); kafkaConsumer.setCommitOffsetsOnCheckpoints(true); diff --git a/src/main/java/com/zdjizhi/utils/kafka/Producer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java similarity index 87% rename from src/main/java/com/zdjizhi/utils/kafka/Producer.java rename to src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java index bf63098..65e253a 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/Producer.java +++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java @@ -13,11 +13,11 @@ import java.util.Properties; * @Description: * @date 2021/6/814:04 */ -public class Producer { +public class KafkaProducer { private static Properties createProducerConfig() { Properties properties = new Properties(); - properties.put("bootstrap.servers", VoipRelationConfig.OUTPUT_KAFKA_SERVERS); + properties.put("bootstrap.servers", VoipRelationConfig.SINK_KAFKA_SERVERS); properties.put("acks", VoipRelationConfig.PRODUCER_ACK); properties.put("retries", VoipRelationConfig.RETRIES); properties.put("linger.ms", VoipRelationConfig.LINGER_MS); @@ -27,7 +27,7 @@ public class Producer { properties.put("max.request.size", VoipRelationConfig.MAX_REQUEST_SIZE); properties.put("compression.type", VoipRelationConfig.PRODUCER_KAFKA_COMPRESSION_TYPE); - CertUtils.chooseCert(VoipRelationConfig.KAFKA_SINK_PROTOCOL, properties); + CertUtils.chooseCert(VoipRelationConfig.SINK_KAFKA_SERVERS, properties); return properties; } @@ -35,7 +35,7 @@ public class Producer { public static FlinkKafkaProducer getKafkaProducer() { FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer( - VoipRelationConfig.OUTPUT_KAFKA_TOPIC, + VoipRelationConfig.SINK_KAFKA_TOPIC, new SimpleStringSchema(), createProducerConfig(), Optional.empty());