diff --git a/src/main/java/com/zdjizhi/flink/voip/functions/SIPPairingFunction.java b/src/main/java/com/zdjizhi/flink/voip/functions/SIPPairingFunction.java index e5f1ba5..6b9f8d2 100644 --- a/src/main/java/com/zdjizhi/flink/voip/functions/SIPPairingFunction.java +++ b/src/main/java/com/zdjizhi/flink/voip/functions/SIPPairingFunction.java @@ -25,15 +25,15 @@ public class SIPPairingFunction extends KeyedProcessFunction valueState; + private transient ValueState valueState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); int minutes = getGlobalConfiguration().get(FusionConfigs.SIP_STATE_CLEAR_INTERVAL); fireInterval = Time.minutes(minutes); - final ValueStateDescriptor descriptor = - new ValueStateDescriptor<>("sip-state", ObjectNodeInfo.class); + final ValueStateDescriptor descriptor = + new ValueStateDescriptor<>("sip-state", ObjectNode.class); final StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(fireInterval) @@ -53,38 +53,30 @@ public class SIPPairingFunction extends KeyedProcessFunction, ObjectNode, ObjectNode>.OnTimerContext ctx, Collector out) throws Exception { - final ObjectNodeInfo info = valueState.value(); - if (info != null) { - if (info.getExpireTime() <= timestamp) { - valueState.clear(); - } - registerIntervalFireTimestamp(ctx.timerService()); - } + valueState.clear(); } } diff --git a/src/main/java/com/zdjizhi/flink/voip/functions/VoIPFusionFunction.java b/src/main/java/com/zdjizhi/flink/voip/functions/VoIPFusionFunction.java index c6f450f..19dc44e 100644 --- a/src/main/java/com/zdjizhi/flink/voip/functions/VoIPFusionFunction.java +++ b/src/main/java/com/zdjizhi/flink/voip/functions/VoIPFusionFunction.java @@ -5,9 +5,7 @@ import com.zdjizhi.flink.voip.records.Record; import com.zdjizhi.flink.voip.records.SchemaType; import com.zdjizhi.flink.voip.records.StreamDir; import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.api.common.state.StateTtlConfig; -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.state.*; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; @@ -15,6 +13,9 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.Obje import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction; import org.apache.flink.util.Collector; +import java.util.Iterator; +import java.util.Map; + /** * The `VoIPFusionFunction` is a key-based co-process function that performs VoIP fusion logic * for SIP and RTP records. It combines SIP and RTP records belonging to the same session @@ -29,8 +30,8 @@ public class VoIPFusionFunction extends KeyedCoProcessFunction sipDoubleDirState; - private transient ValueState rtpState; + private transient ValueState sipState; + private transient MapState rtpState; @Override public void open(Configuration parameters) throws Exception { @@ -41,9 +42,6 @@ public class VoIPFusionFunction extends KeyedCoProcessFunction sipDescriptor = new ValueStateDescriptor<>("sip-state", ObjectNodeInfo.class); - final ValueStateDescriptor rtpDescriptor = - new ValueStateDescriptor<>("rtp-state", ObjectNodeInfo.class); - final StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(fireInterval) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) @@ -51,78 +49,109 @@ public class VoIPFusionFunction extends KeyedCoProcessFunction rtpDescriptor = + new MapStateDescriptor<>("rtp-state", StreamDir.class, ObjectNode.class); + + final StateTtlConfig rtpTtlConfig = StateTtlConfig + .newBuilder(Time.minutes(minutes + 1)) + .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) + .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) + .useProcessingTime() + .cleanupFullSnapshot() + .build(); + rtpDescriptor.enableTimeToLive(rtpTtlConfig); + rtpState = context.getMapState(rtpDescriptor); } // SIP @Override - public void processElement2(ObjectNode obj, + public void processElement2(ObjectNode sipObj, KeyedCoProcessFunction, ObjectNode, ObjectNode, ObjectNode>.Context ctx, Collector out) throws Exception { - final ObjectNodeInfo info = rtpState.value(); - if (info != null) { - final Record record = new Record(info.getObj()); - record.merge(obj) - .setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue()); - out.collect(info.getObj()); - if (record.getStreamDir() != StreamDir.DOUBLE.getValue()) { - sipDoubleDirState.update(new ObjectNodeInfo(obj, - ctx.timerService().currentProcessingTime() + fireInterval.toMilliseconds())); - } - } else { - sipDoubleDirState.update(new ObjectNodeInfo(obj, - ctx.timerService().currentProcessingTime() + fireInterval.toMilliseconds())); + final Iterator> iterator = rtpState.iterator(); + if (rtpState.isEmpty()) { + sipState.update(new ObjectNodeInfo(sipObj, 0)); } - registerNextFireTimestamp(ctx.timerService(), ctx.getCurrentKey().hashCode()); + while (iterator.hasNext()) { + final Map.Entry entry = iterator.next(); + final ObjectNode rtpObj = entry.getValue(); + final Record rtpRecord = new Record(rtpObj); + + rtpRecord.merge(sipObj) + .setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue()); + out.collect(rtpObj); + + switch (entry.getKey()) { + case S2C: + case C2S: + ObjectNodeInfo info = sipState.value(); + if (info != null) { + info.incTimes(); + if (info.getTimes() >= MAX_RTP_LINES) { + sipState.clear(); + } else { + sipState.update(new ObjectNodeInfo(sipObj, info.getTimes())); + } + } else { + sipState.update(new ObjectNodeInfo(sipObj, 1)); + } + break; + default: + // Double directional: + // In the context of VoIP fusion, only one RTP double directional stream + sipState.clear(); + } + } + + registerNextFireTimestamp(ctx.timerService(), fireInterval.toMilliseconds()); } // RTP @Override - public void processElement1(ObjectNode obj, + public void processElement1(ObjectNode rtpObj, KeyedCoProcessFunction, ObjectNode, ObjectNode, ObjectNode>.Context ctx, Collector out) throws Exception { - final Record record = new Record(obj); - final ObjectNodeInfo info = sipDoubleDirState.value(); + final Record rtpRecord = new Record(rtpObj); + final ObjectNodeInfo info = sipState.value(); + + final StreamDir streamDir = rtpRecord.getStreamDir(); if (null != info) { - record.merge(info.getObj()) + + rtpRecord.merge(info.getObj()) .setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue()); - out.collect(obj); - // In the context of VoIP fusion, only one RTP double directional stream - if (record.getStreamDir() == StreamDir.DOUBLE.getValue() - || info.getTimes() >= MAX_RTP_LINES - 1) { - sipDoubleDirState.clear(); - } else { // Save the number of fused RTP unidirectional streams - info.setTimes(info.getTimes() + 1); - sipDoubleDirState.update(info); + out.collect(rtpObj); + + switch (streamDir) { + case C2S: + case S2C: + info.incTimes(); + if (info.getTimes() >= MAX_RTP_LINES) { + sipState.clear(); + } + break; + default: + // Double + sipState.clear(); } } else { - rtpState.update(new ObjectNodeInfo(obj, - ctx.timerService() - .currentProcessingTime() + fireInterval.toMilliseconds())); + rtpState.put(streamDir, rtpObj); } - registerNextFireTimestamp(ctx.timerService(), ctx.getCurrentKey().hashCode()); + + registerNextFireTimestamp(ctx.timerService(), fireInterval.toMilliseconds()); } @Override public void onTimer(long timestamp, KeyedCoProcessFunction, ObjectNode, ObjectNode, ObjectNode>.OnTimerContext ctx, Collector out) throws Exception { - final ObjectNodeInfo rtpInfo = rtpState.value(); - if (rtpInfo != null && rtpInfo.getExpireTime() <= timestamp) { - out.collect(rtpInfo.getObj()); - rtpState.clear(); - } - final ObjectNodeInfo sipInfo = sipDoubleDirState.value(); - if (sipInfo != null && sipInfo.getExpireTime() <= timestamp) { - sipDoubleDirState.clear(); - } - if (rtpInfo != null || sipInfo != null) { - registerIntervalFireTimestamp(ctx.timerService()); + for (ObjectNode obj : rtpState.values()) { + out.collect(obj); } + rtpState.clear(); + sipState.clear(); } } \ No newline at end of file