diff --git a/src/main/java/com/zdjizhi/flink/voip/functions/VoIPFusionFunction.java b/src/main/java/com/zdjizhi/flink/voip/functions/VoIPFusionFunction.java new file mode 100644 index 0000000..b2be3af --- /dev/null +++ b/src/main/java/com/zdjizhi/flink/voip/functions/VoIPFusionFunction.java @@ -0,0 +1,119 @@ +package com.zdjizhi.flink.voip.functions; + +import com.zdjizhi.flink.voip.conf.FusionConfigs; +import com.zdjizhi.flink.voip.records.Record; +import com.zdjizhi.flink.voip.records.SIPRecord; +import com.zdjizhi.flink.voip.records.SchemaType; +import com.zdjizhi.flink.voip.records.StreamDir; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction; +import org.apache.flink.util.Collector; + +import java.util.Iterator; +import java.util.Map; + +/** + * The `VoIPFusionFunction` is a key-based co-process function that performs VoIP fusion logic + * for SIP and RTP records. It combines SIP and RTP records belonging to the same session + * and emits fused VoIP records. The function utilizes keyed state to store and manage SIP and + * RTP records, and it uses timers to trigger regular clearing of the state. + * + * @author chaoc + * @since 1.0 + */ +public class VoIPFusionFunction extends KeyedCoProcessFunction + implements FunctionHelper { + + private static final int MAX_RTP_LINES = 2; + private transient Time fireInterval; + private transient MapState sipDoubleDirState; + private transient MapState rtpState; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + final Long milliseconds = getGlobalConfiguration().get(FusionConfigs.RTP_STATE_CLEAR_INTERVAL); + fireInterval = Time.milliseconds(milliseconds); + sipDoubleDirState = getMapState("sip-state", Address.class, ObjectNodeInfo.class); + rtpState = getMapState("rtp-state", Address.class, ObjectNodeInfo.class); + } + + @Override + public void processElement1(ObjectNode obj, + KeyedCoProcessFunction.Context ctx, + Collector out) throws Exception { + final SIPRecord record = new SIPRecord(obj); + final Address address = Address.of( + Tuple2.of(record.getOriginatorSdpConnectIp(), record.getOriginatorSdpMediaPort()), + Tuple2.of(record.getResponderSdpConnectIp(), record.getResponderSdpMediaPort())); + sipDoubleDirState.put(address, new ObjectNodeInfo(obj, + ctx.timerService().currentProcessingTime() + fireInterval.toMilliseconds())); + registerNextFireTimestamp(ctx.timerService(), fireInterval); + } + + @Override + public void processElement2(ObjectNode obj, + KeyedCoProcessFunction.Context ctx, + Collector out) throws Exception { + final Record record = new Record(obj); + final Address address = Address.of(Tuple2.of(record.getServerIp(), record.getServerPort()), + Tuple2.of(record.getClientIp(), record.getClientPort())); + if (sipDoubleDirState.contains(address)) { + final ObjectNodeInfo info = sipDoubleDirState.get(address); + record.merge(info.getObj()) + .setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue()); + out.collect(obj); + // In the context of VoIP fusion, only one RTP double directional stream + if (record.getStreamDir() == StreamDir.DOUBLE.getValue()) { + sipDoubleDirState.remove(address); + } else { // Save the number of fused RTP unidirectional streams + info.setTimes(info.getTimes() + 1); + sipDoubleDirState.put(address, info); + } + + } else { + rtpState.put(address, new ObjectNodeInfo(obj, + ctx.timerService().currentProcessingTime() + fireInterval.toMilliseconds())); + } + registerNextFireTimestamp(ctx.timerService(), fireInterval); + } + + @Override + public void onTimer(long timestamp, + KeyedCoProcessFunction.OnTimerContext ctx, + Collector out) throws Exception { + final Iterator> iterator = rtpState.iterator(); + while (iterator.hasNext()) { + final Map.Entry entry = iterator.next(); + final Address address = entry.getKey(); + final ObjectNode obj = entry.getValue().getObj(); + final Record record = new Record(obj); + if (sipDoubleDirState.contains(address)) { + final ObjectNodeInfo info = sipDoubleDirState.get(address); + if (record.getStreamDir() == StreamDir.DOUBLE.getValue() + || info.getTimes() >= MAX_RTP_LINES - 1 + /* One RTP unidirectional stream has already been fused*/) { + sipDoubleDirState.remove(address); + } + record.merge(info.getObj()) + .setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue()); + out.collect(obj); + } + if (entry.getValue().getExpireTime() <= timestamp) { + rtpState.remove(address); + } + } + final Iterator> sipIterator = sipDoubleDirState.iterator(); + while (sipIterator.hasNext()) { + final Map.Entry entry = sipIterator.next(); + if (entry.getValue().getExpireTime() <= timestamp) { + sipDoubleDirState.remove(entry.getKey()); + } + } + registerNextFireTimestamp(ctx.timerService(), fireInterval); + } +} \ No newline at end of file