diff --git a/src/it/java/com/zdjizhi/flink/voip/FusionTest.java b/src/it/java/com/zdjizhi/flink/voip/FusionTest.java index 59fba45..af17682 100644 --- a/src/it/java/com/zdjizhi/flink/voip/FusionTest.java +++ b/src/it/java/com/zdjizhi/flink/voip/FusionTest.java @@ -3,6 +3,7 @@ package com.zdjizhi.flink.voip; import com.zdjizhi.flink.voip.functions.*; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; @@ -67,7 +68,7 @@ public class FusionTest { .name("PairingOneWayToDoubleStream") .uid("pairing-one-way-to-double"); - final KeySelector vSysSelector = new VSysIDKeySelector(); + final KeySelector> vSysSelector = new VSysIDKeySelector(); // Fusion SIP data and RTP data to VoIP data. final SingleOutputStreamOperator voIpOperator = rtpDataStream diff --git a/src/main/java/com/zdjizhi/flink/voip/FusionApp.java b/src/main/java/com/zdjizhi/flink/voip/FusionApp.java index 6c16e3a..b7bd64a 100644 --- a/src/main/java/com/zdjizhi/flink/voip/FusionApp.java +++ b/src/main/java/com/zdjizhi/flink/voip/FusionApp.java @@ -77,7 +77,7 @@ public class FusionApp { .name("PairingOneWayToDoubleStream") .uid("pairing-one-way-to-double"); - final KeySelector vSysSelector = new VSysIDKeySelector(); + final KeySelector> vSysSelector = new VSysIDKeySelector(); // Fusion SIP data and RTP data to VoIP data. final SingleOutputStreamOperator voIpOperator = rtpDataStream diff --git a/src/main/java/com/zdjizhi/flink/voip/functions/Address.java b/src/main/java/com/zdjizhi/flink/voip/functions/Address.java index aacd25c..3642f58 100644 --- a/src/main/java/com/zdjizhi/flink/voip/functions/Address.java +++ b/src/main/java/com/zdjizhi/flink/voip/functions/Address.java @@ -17,7 +17,7 @@ import java.util.List; */ @Data @AllArgsConstructor -class Address { +public class Address { // The first IP address. private final String ip1; diff --git a/src/main/java/com/zdjizhi/flink/voip/functions/FunctionHelper.java b/src/main/java/com/zdjizhi/flink/voip/functions/FunctionHelper.java index f95676d..363f920 100644 --- a/src/main/java/com/zdjizhi/flink/voip/functions/FunctionHelper.java +++ b/src/main/java/com/zdjizhi/flink/voip/functions/FunctionHelper.java @@ -4,7 +4,6 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; -import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.TimerService; @@ -42,6 +41,7 @@ public interface FunctionHelper extends RichFunction { final MapStateDescriptor descriptor = new MapStateDescriptor<>(name, key, value); return getRuntimeContext().getMapState(descriptor); } + int INTERVAL = 1000 * 60; /** @@ -55,4 +55,8 @@ public interface FunctionHelper extends RichFunction { long nextFireTime = (current / INTERVAL + 1) * INTERVAL + factor % INTERVAL; timerService.registerProcessingTimeTimer(nextFireTime); } + + default void registerIntervalFireTimestamp(TimerService timerService) { + timerService.registerProcessingTimeTimer(timerService.currentProcessingTime() + INTERVAL); + } } diff --git a/src/main/java/com/zdjizhi/flink/voip/functions/SIPKeySelector.java b/src/main/java/com/zdjizhi/flink/voip/functions/SIPKeySelector.java index 91c1f0f..678f9c1 100644 --- a/src/main/java/com/zdjizhi/flink/voip/functions/SIPKeySelector.java +++ b/src/main/java/com/zdjizhi/flink/voip/functions/SIPKeySelector.java @@ -3,6 +3,7 @@ package com.zdjizhi.flink.voip.functions; import com.zdjizhi.flink.voip.records.SIPRecord; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; /** @@ -11,18 +12,20 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.Obje * @author chaoc * @since 1.0 */ -public class SIPKeySelector implements KeySelector> { +public class SIPKeySelector implements KeySelector> { /** - * Extracts the composite key (VSysID, CallID) from the given ObjectNode. + * Extracts the composite key (VSysID, CallID, Address) from the given ObjectNode. * * @param obj The ObjectNode representing a SIP record. - * @return A Tuple2 containing the extracted key (VSysID, CallID). + * @return A Tuple3 containing the extracted key (VSysID, CallID, Address). * @throws Exception Thrown if an error occurs during key extraction. */ @Override - public Tuple2 getKey(ObjectNode obj) throws Exception { + public Tuple3 getKey(ObjectNode obj) throws Exception { final SIPRecord record = new SIPRecord(obj); - return Tuple2.of(record.getVSysID(), record.getCallID()); + final Address address = Address.of(Tuple2.of(record.getClientIp(), record.getClientPort()), + Tuple2.of(record.getServerIp(), record.getServerPort())); + return Tuple3.of(record.getVSysID(), record.getCallID(), address); } } \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/flink/voip/functions/SIPPairingFunction.java b/src/main/java/com/zdjizhi/flink/voip/functions/SIPPairingFunction.java index 6441b45..ac88126 100644 --- a/src/main/java/com/zdjizhi/flink/voip/functions/SIPPairingFunction.java +++ b/src/main/java/com/zdjizhi/flink/voip/functions/SIPPairingFunction.java @@ -3,19 +3,17 @@ package com.zdjizhi.flink.voip.functions; import com.zdjizhi.flink.voip.conf.FusionConfigs; import com.zdjizhi.flink.voip.records.Record; import com.zdjizhi.flink.voip.records.StreamDir; -import org.apache.flink.api.common.state.MapState; -import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.StateTtlConfig; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; -import java.util.Iterator; -import java.util.Map; - /** * A KeyedProcessFunction that pairs SIP records based on their addresses and stream direction. * SIP records are paired when they have the same addresses but opposite stream directions. @@ -23,20 +21,20 @@ import java.util.Map; * @author chaoc * @since 1.0 */ -public class SIPPairingFunction extends KeyedProcessFunction, ObjectNode, ObjectNode> +public class SIPPairingFunction extends KeyedProcessFunction, ObjectNode, ObjectNode> implements FunctionHelper { private transient Time fireInterval; - private transient MapState mapState; + + private transient ValueState valueState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); int minutes = getGlobalConfiguration().get(FusionConfigs.SIP_STATE_CLEAR_INTERVAL); fireInterval = Time.minutes(minutes); - - final MapStateDescriptor descriptor = - new MapStateDescriptor<>("sip-state", Address.class, ObjectNodeInfo.class); + final ValueStateDescriptor descriptor = + new ValueStateDescriptor<>("sip-state", ObjectNodeInfo.class); final StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(fireInterval) @@ -45,12 +43,12 @@ public class SIPPairingFunction extends KeyedProcessFunction, ObjectNode, ObjectNode>.Context ctx, + KeyedProcessFunction, ObjectNode, ObjectNode>.Context ctx, Collector out) throws Exception { final Record record = new Record(value); @@ -61,18 +59,17 @@ public class SIPPairingFunction extends KeyedProcessFunction, ObjectNode, ObjectNode>.OnTimerContext ctx, + KeyedProcessFunction, ObjectNode, ObjectNode>.OnTimerContext ctx, Collector out) throws Exception { - final Iterator> iterator = mapState.iterator(); - while (iterator.hasNext()) { - final Map.Entry entry = iterator.next(); - if (entry.getValue().getExpireTime() <= timestamp) { - iterator.remove(); - } + final ObjectNodeInfo info = valueState.value(); + if (info != null && info.getExpireTime() <= timestamp) { + valueState.clear(); } - ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + INTERVAL); + registerIntervalFireTimestamp(ctx.timerService()); } } diff --git a/src/main/java/com/zdjizhi/flink/voip/functions/VSysIDKeySelector.java b/src/main/java/com/zdjizhi/flink/voip/functions/VSysIDKeySelector.java index 9cbb1da..9633fe3 100644 --- a/src/main/java/com/zdjizhi/flink/voip/functions/VSysIDKeySelector.java +++ b/src/main/java/com/zdjizhi/flink/voip/functions/VSysIDKeySelector.java @@ -1,27 +1,42 @@ package com.zdjizhi.flink.voip.functions; import com.zdjizhi.flink.voip.records.Record; +import com.zdjizhi.flink.voip.records.SIPRecord; +import com.zdjizhi.flink.voip.records.SchemaType; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; +import java.util.Objects; + /** * A KeySelector implementation that extracts the key(VSysID) from an ObjectNode. * * @author chaoc * @since 1.0 */ -public class VSysIDKeySelector implements KeySelector { +public class VSysIDKeySelector implements KeySelector> { /** - * Gets the VSysID from the provided ObjectNode. + * Extracts the composite key (VSysID, Address) from the given ObjectNode. * - * @param obj The input ObjectNode. - * @return The extracted key(VSysID). - * @throws Exception Thrown if there is an error extracting the VSysID. + * @param obj The ObjectNode representing a SIP record. + * @return A Tuple2 containing the extracted key (VSysID, Address). + * @throws Exception Thrown if an error occurs during key extraction. */ @Override - public Integer getKey(ObjectNode obj) throws Exception { + public Tuple2 getKey(ObjectNode obj) throws Exception { final Record record = new Record(obj); - return record.getVSysID(); + final Address address; + if (Objects.equals(record.getSchemaType(), SchemaType.SIP.getValue())) { + final SIPRecord sipRecord = new SIPRecord(obj); + address = Address.of( + Tuple2.of(sipRecord.getOriginatorSdpConnectIp(), sipRecord.getOriginatorSdpMediaPort()), + Tuple2.of(sipRecord.getResponderSdpConnectIp(), sipRecord.getResponderSdpMediaPort())); + } else { + address = Address.of(Tuple2.of(record.getServerIp(), record.getServerPort()), + Tuple2.of(record.getClientIp(), record.getClientPort())); + } + return Tuple2.of(record.getVSysID(), address); } } diff --git a/src/main/java/com/zdjizhi/flink/voip/functions/VoIPFusionFunction.java b/src/main/java/com/zdjizhi/flink/voip/functions/VoIPFusionFunction.java index d00d8e2..dd19245 100644 --- a/src/main/java/com/zdjizhi/flink/voip/functions/VoIPFusionFunction.java +++ b/src/main/java/com/zdjizhi/flink/voip/functions/VoIPFusionFunction.java @@ -2,13 +2,12 @@ package com.zdjizhi.flink.voip.functions; import com.zdjizhi.flink.voip.conf.FusionConfigs; import com.zdjizhi.flink.voip.records.Record; -import com.zdjizhi.flink.voip.records.SIPRecord; import com.zdjizhi.flink.voip.records.SchemaType; import com.zdjizhi.flink.voip.records.StreamDir; import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.api.common.state.MapState; -import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.StateTtlConfig; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; @@ -16,9 +15,6 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.Obje import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction; import org.apache.flink.util.Collector; -import java.util.Iterator; -import java.util.Map; - /** * The `VoIPFusionFunction` is a key-based co-process function that performs VoIP fusion logic * for SIP and RTP records. It combines SIP and RTP records belonging to the same session @@ -28,21 +24,26 @@ import java.util.Map; * @author chaoc * @since 1.0 */ -public class VoIPFusionFunction extends KeyedCoProcessFunction +public class VoIPFusionFunction extends KeyedCoProcessFunction, ObjectNode, ObjectNode, ObjectNode> implements FunctionHelper { private static final int MAX_RTP_LINES = 2; private transient Time fireInterval; - private transient MapState sipDoubleDirState; - private transient MapState rtpState; + private transient ValueState sipDoubleDirState; + private transient ValueState rtpState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); - final int minutes = getGlobalConfiguration() - .get(FusionConfigs.RTP_STATE_CLEAR_INTERVAL); + final int minutes = getGlobalConfiguration().get(FusionConfigs.RTP_STATE_CLEAR_INTERVAL); fireInterval = Time.minutes(minutes); final RuntimeContext context = getRuntimeContext(); + final ValueStateDescriptor sipDescriptor = + new ValueStateDescriptor<>("sip-state", ObjectNodeInfo.class); + sipDoubleDirState = context.getState(sipDescriptor); + final ValueStateDescriptor rtpDescriptor = + new ValueStateDescriptor<>("rtp-state", ObjectNodeInfo.class); + final StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(fireInterval) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) @@ -50,90 +51,77 @@ public class VoIPFusionFunction extends KeyedCoProcessFunction sipDescriptor = - new MapStateDescriptor<>("sip-state", Address.class, ObjectNodeInfo.class); - sipDescriptor.enableTimeToLive(ttlConfig); - sipDoubleDirState = context.getMapState(sipDescriptor); - - final MapStateDescriptor rtpDescriptor = - new MapStateDescriptor<>("rtp-state", Address.class, ObjectNodeInfo.class); rtpDescriptor.enableTimeToLive(ttlConfig); - rtpState = context.getMapState(rtpDescriptor); + sipDescriptor.enableTimeToLive(ttlConfig); + + rtpState = context.getState(rtpDescriptor); } @Override public void processElement2(ObjectNode obj, - KeyedCoProcessFunction.Context ctx, + KeyedCoProcessFunction, ObjectNode, ObjectNode, ObjectNode>.Context ctx, Collector out) throws Exception { - final SIPRecord record = new SIPRecord(obj); - final Address address = Address.of( - Tuple2.of(record.getOriginatorSdpConnectIp(), record.getOriginatorSdpMediaPort()), - Tuple2.of(record.getResponderSdpConnectIp(), record.getResponderSdpMediaPort())); - sipDoubleDirState.put(address, new ObjectNodeInfo(obj, + sipDoubleDirState.update(new ObjectNodeInfo(obj, ctx.timerService().currentProcessingTime() + fireInterval.toMilliseconds())); registerNextFireTimestamp(ctx.timerService(), ctx.getCurrentKey().hashCode()); } @Override public void processElement1(ObjectNode obj, - KeyedCoProcessFunction.Context ctx, + KeyedCoProcessFunction, ObjectNode, ObjectNode, ObjectNode>.Context ctx, Collector out) throws Exception { final Record record = new Record(obj); - final Address address = Address.of(Tuple2.of(record.getServerIp(), record.getServerPort()), - Tuple2.of(record.getClientIp(), record.getClientPort())); - if (sipDoubleDirState.contains(address)) { - final ObjectNodeInfo info = sipDoubleDirState.get(address); + final ObjectNodeInfo info = sipDoubleDirState.value(); + if (null != info) { record.merge(info.getObj()) .setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue()); out.collect(obj); // In the context of VoIP fusion, only one RTP double directional stream - if (record.getStreamDir() == StreamDir.DOUBLE.getValue()) { - sipDoubleDirState.remove(address); + if (record.getStreamDir() == StreamDir.DOUBLE.getValue() + || info.getTimes() >= MAX_RTP_LINES - 1) { + sipDoubleDirState.clear(); } else { // Save the number of fused RTP unidirectional streams info.setTimes(info.getTimes() + 1); - sipDoubleDirState.put(address, info); + sipDoubleDirState.update(info); } } else { - rtpState.put(address, new ObjectNodeInfo(obj, - ctx.timerService().currentProcessingTime() + fireInterval.toMilliseconds())); + rtpState.update(new ObjectNodeInfo(obj, + ctx.timerService() + .currentProcessingTime() + fireInterval.toMilliseconds())); } registerNextFireTimestamp(ctx.timerService(), ctx.getCurrentKey().hashCode()); } @Override public void onTimer(long timestamp, - KeyedCoProcessFunction.OnTimerContext ctx, + KeyedCoProcessFunction, ObjectNode, ObjectNode, ObjectNode>.OnTimerContext ctx, Collector out) throws Exception { - final Iterator> iterator = rtpState.iterator(); - while (iterator.hasNext()) { - final Map.Entry entry = iterator.next(); - final Address address = entry.getKey(); - final ObjectNode obj = entry.getValue().getObj(); + final ObjectNodeInfo rtpInfo = rtpState.value(); + if (rtpInfo != null) { + final ObjectNode obj = rtpInfo.getObj(); final Record record = new Record(obj); - if (sipDoubleDirState.contains(address)) { - final ObjectNodeInfo info = sipDoubleDirState.get(address); - if (record.getStreamDir() == StreamDir.DOUBLE.getValue() - || info.getTimes() >= MAX_RTP_LINES - 1 - /* One RTP unidirectional stream has already been fused*/) { - sipDoubleDirState.remove(address); - } - record.merge(info.getObj()) - .setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue()); - out.collect(obj); + record.merge(obj) + .setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue()); + out.collect(obj); + + if (record.getStreamDir() == StreamDir.DOUBLE.getValue() + || rtpInfo.getTimes() >= MAX_RTP_LINES - 1) { + sipDoubleDirState.clear(); + } else { // Save the number of fused RTP unidirectional streams + rtpInfo.setTimes(rtpInfo.getTimes() + 1); + sipDoubleDirState.update(rtpInfo); } - if (entry.getValue().getExpireTime() <= timestamp) { - out.collect(entry.getValue().getObj()); - iterator.remove(); + + if (rtpInfo.getExpireTime() <= timestamp) { + rtpState.clear(); } } - final Iterator> sipIterator = sipDoubleDirState.iterator(); - while (sipIterator.hasNext()) { - final Map.Entry entry = sipIterator.next(); - if (entry.getValue().getExpireTime() <= timestamp) { - sipIterator.remove(); - } + + final ObjectNodeInfo sipInfo = sipDoubleDirState.value(); + if (sipInfo != null && sipInfo.getExpireTime() <= timestamp) { + sipDoubleDirState.clear(); } - ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + INTERVAL); + registerIntervalFireTimestamp(ctx.timerService()); } } \ No newline at end of file diff --git a/src/test/java/com/zdjizhi/flink/voip/functions/SIPPairingFunctionTest.java b/src/test/java/com/zdjizhi/flink/voip/functions/SIPPairingFunctionTest.java index 06113a2..104c20e 100644 --- a/src/test/java/com/zdjizhi/flink/voip/functions/SIPPairingFunctionTest.java +++ b/src/test/java/com/zdjizhi/flink/voip/functions/SIPPairingFunctionTest.java @@ -9,7 +9,7 @@ import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; @@ -33,7 +33,7 @@ public class SIPPairingFunctionTest { @BeforeAll static void setUp() throws Exception { final SIPPairingFunction func = new SIPPairingFunction(); - final KeyedProcessOperator, ObjectNode, ObjectNode> operator = + final KeyedProcessOperator, ObjectNode, ObjectNode> operator = new KeyedProcessOperator<>(func); final TypeInformation type = TypeInformation.of(Integer.class); final KeySelector keySelector = jsonNodes -> 0;