feat: impl using the Address as the keyBy parameter

This commit is contained in:
chaoc
2023-08-09 16:38:34 +08:00
parent 6a444d38ba
commit 7221f2a52f
9 changed files with 111 additions and 106 deletions

View File

@@ -3,6 +3,7 @@ package com.zdjizhi.flink.voip;
import com.zdjizhi.flink.voip.functions.*;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
@@ -67,7 +68,7 @@ public class FusionTest {
.name("PairingOneWayToDoubleStream")
.uid("pairing-one-way-to-double");
final KeySelector<ObjectNode, Integer> vSysSelector = new VSysIDKeySelector();
final KeySelector<ObjectNode, Tuple2<Integer, Address>> vSysSelector = new VSysIDKeySelector();
// Fusion SIP data and RTP data to VoIP data.
final SingleOutputStreamOperator<ObjectNode> voIpOperator = rtpDataStream

View File

@@ -77,7 +77,7 @@ public class FusionApp {
.name("PairingOneWayToDoubleStream")
.uid("pairing-one-way-to-double");
final KeySelector<ObjectNode, Integer> vSysSelector = new VSysIDKeySelector();
final KeySelector<ObjectNode, Tuple2<Integer, Address>> vSysSelector = new VSysIDKeySelector();
// Fusion SIP data and RTP data to VoIP data.
final SingleOutputStreamOperator<ObjectNode> voIpOperator = rtpDataStream

View File

@@ -17,7 +17,7 @@ import java.util.List;
*/
@Data
@AllArgsConstructor
class Address {
public class Address {
// The first IP address.
private final String ip1;

View File

@@ -4,7 +4,6 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimerService;
@@ -42,6 +41,7 @@ public interface FunctionHelper extends RichFunction {
final MapStateDescriptor<K, V> descriptor = new MapStateDescriptor<>(name, key, value);
return getRuntimeContext().getMapState(descriptor);
}
int INTERVAL = 1000 * 60;
/**
@@ -55,4 +55,8 @@ public interface FunctionHelper extends RichFunction {
long nextFireTime = (current / INTERVAL + 1) * INTERVAL + factor % INTERVAL;
timerService.registerProcessingTimeTimer(nextFireTime);
}
default void registerIntervalFireTimestamp(TimerService timerService) {
timerService.registerProcessingTimeTimer(timerService.currentProcessingTime() + INTERVAL);
}
}

View File

@@ -3,6 +3,7 @@ package com.zdjizhi.flink.voip.functions;
import com.zdjizhi.flink.voip.records.SIPRecord;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
/**
@@ -11,18 +12,20 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.Obje
* @author chaoc
* @since 1.0
*/
public class SIPKeySelector implements KeySelector<ObjectNode, Tuple2<Integer, String>> {
public class SIPKeySelector implements KeySelector<ObjectNode, Tuple3<Integer, String, Address>> {
/**
* Extracts the composite key (VSysID, CallID) from the given ObjectNode.
* Extracts the composite key (VSysID, CallID, Address) from the given ObjectNode.
*
* @param obj The ObjectNode representing a SIP record.
* @return A Tuple2 containing the extracted key (VSysID, CallID).
* @return A Tuple3 containing the extracted key (VSysID, CallID, Address).
* @throws Exception Thrown if an error occurs during key extraction.
*/
@Override
public Tuple2<Integer, String> getKey(ObjectNode obj) throws Exception {
public Tuple3<Integer, String, Address> getKey(ObjectNode obj) throws Exception {
final SIPRecord record = new SIPRecord(obj);
return Tuple2.of(record.getVSysID(), record.getCallID());
final Address address = Address.of(Tuple2.of(record.getClientIp(), record.getClientPort()),
Tuple2.of(record.getServerIp(), record.getServerPort()));
return Tuple3.of(record.getVSysID(), record.getCallID(), address);
}
}

View File

@@ -3,19 +3,17 @@ package com.zdjizhi.flink.voip.functions;
import com.zdjizhi.flink.voip.conf.FusionConfigs;
import com.zdjizhi.flink.voip.records.Record;
import com.zdjizhi.flink.voip.records.StreamDir;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.util.Iterator;
import java.util.Map;
/**
* A KeyedProcessFunction that pairs SIP records based on their addresses and stream direction.
* SIP records are paired when they have the same addresses but opposite stream directions.
@@ -23,20 +21,20 @@ import java.util.Map;
* @author chaoc
* @since 1.0
*/
public class SIPPairingFunction extends KeyedProcessFunction<Tuple2<Integer, String>, ObjectNode, ObjectNode>
public class SIPPairingFunction extends KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode>
implements FunctionHelper {
private transient Time fireInterval;
private transient MapState<Address, ObjectNodeInfo> mapState;
private transient ValueState<ObjectNodeInfo> valueState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
int minutes = getGlobalConfiguration().get(FusionConfigs.SIP_STATE_CLEAR_INTERVAL);
fireInterval = Time.minutes(minutes);
final MapStateDescriptor<Address, ObjectNodeInfo> descriptor =
new MapStateDescriptor<>("sip-state", Address.class, ObjectNodeInfo.class);
final ValueStateDescriptor<ObjectNodeInfo> descriptor =
new ValueStateDescriptor<>("sip-state", ObjectNodeInfo.class);
final StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(fireInterval)
@@ -45,12 +43,12 @@ public class SIPPairingFunction extends KeyedProcessFunction<Tuple2<Integer, Str
.cleanupFullSnapshot()
.build();
descriptor.enableTimeToLive(ttlConfig);
mapState = getRuntimeContext().getMapState(descriptor);
valueState = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(ObjectNode value,
KeyedProcessFunction<Tuple2<Integer, String>, ObjectNode, ObjectNode>.Context ctx,
KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode>.Context ctx,
Collector<ObjectNode> out) throws Exception {
final Record record = new Record(value);
@@ -61,18 +59,17 @@ public class SIPPairingFunction extends KeyedProcessFunction<Tuple2<Integer, Str
Tuple2.of(record.getServerIp(), record.getServerPort()));
// If the address is already stored in the mapState and has the opposite stream direction,
// merge the SIP records, change the stream direction to DOUBLE, and output the merged record.
if (mapState.contains(address) &&
new Record(mapState.get(address).getObj()).getStreamDir() != record.getStreamDir()
/* TODO consider stream direction */) {
record.merge(mapState.get(address).getObj());
final ObjectNodeInfo info = valueState.value();
if (null != info && new Record(info.getObj()).getStreamDir() != record.getStreamDir()) {
record.merge(info.getObj());
record.setInt(Record.F_COMMON_STREAM_DIR, StreamDir.DOUBLE.getValue());
out.collect(value);
mapState.remove(address);
valueState.clear();
} else {
// If the address is not yet in the mapState, add it with its expiration time.
final ObjectNodeInfo info = new ObjectNodeInfo(value,
final ObjectNodeInfo nodeInfo = new ObjectNodeInfo(value,
ctx.timerService().currentProcessingTime() + fireInterval.toMilliseconds());
mapState.put(address, info);
valueState.update(nodeInfo);
}
} else {
// If SIP is a double stream, pairing isn't required, directly output the record.
@@ -83,15 +80,12 @@ public class SIPPairingFunction extends KeyedProcessFunction<Tuple2<Integer, Str
@Override
public void onTimer(long timestamp,
KeyedProcessFunction<Tuple2<Integer, String>, ObjectNode, ObjectNode>.OnTimerContext ctx,
KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode>.OnTimerContext ctx,
Collector<ObjectNode> out) throws Exception {
final Iterator<Map.Entry<Address, ObjectNodeInfo>> iterator = mapState.iterator();
while (iterator.hasNext()) {
final Map.Entry<Address, ObjectNodeInfo> entry = iterator.next();
if (entry.getValue().getExpireTime() <= timestamp) {
iterator.remove();
}
final ObjectNodeInfo info = valueState.value();
if (info != null && info.getExpireTime() <= timestamp) {
valueState.clear();
}
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + INTERVAL);
registerIntervalFireTimestamp(ctx.timerService());
}
}

View File

@@ -1,27 +1,42 @@
package com.zdjizhi.flink.voip.functions;
import com.zdjizhi.flink.voip.records.Record;
import com.zdjizhi.flink.voip.records.SIPRecord;
import com.zdjizhi.flink.voip.records.SchemaType;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.Objects;
/**
* A KeySelector implementation that extracts the key(VSysID) from an ObjectNode.
*
* @author chaoc
* @since 1.0
*/
public class VSysIDKeySelector implements KeySelector<ObjectNode, Integer> {
public class VSysIDKeySelector implements KeySelector<ObjectNode, Tuple2<Integer, Address>> {
/**
* Gets the VSysID from the provided ObjectNode.
* Extracts the composite key (VSysID, Address) from the given ObjectNode.
*
* @param obj The input ObjectNode.
* @return The extracted key(VSysID).
* @throws Exception Thrown if there is an error extracting the VSysID.
* @param obj The ObjectNode representing a SIP record.
* @return A Tuple2 containing the extracted key (VSysID, Address).
* @throws Exception Thrown if an error occurs during key extraction.
*/
@Override
public Integer getKey(ObjectNode obj) throws Exception {
public Tuple2<Integer, Address> getKey(ObjectNode obj) throws Exception {
final Record record = new Record(obj);
return record.getVSysID();
final Address address;
if (Objects.equals(record.getSchemaType(), SchemaType.SIP.getValue())) {
final SIPRecord sipRecord = new SIPRecord(obj);
address = Address.of(
Tuple2.of(sipRecord.getOriginatorSdpConnectIp(), sipRecord.getOriginatorSdpMediaPort()),
Tuple2.of(sipRecord.getResponderSdpConnectIp(), sipRecord.getResponderSdpMediaPort()));
} else {
address = Address.of(Tuple2.of(record.getServerIp(), record.getServerPort()),
Tuple2.of(record.getClientIp(), record.getClientPort()));
}
return Tuple2.of(record.getVSysID(), address);
}
}

View File

@@ -2,13 +2,12 @@ package com.zdjizhi.flink.voip.functions;
import com.zdjizhi.flink.voip.conf.FusionConfigs;
import com.zdjizhi.flink.voip.records.Record;
import com.zdjizhi.flink.voip.records.SIPRecord;
import com.zdjizhi.flink.voip.records.SchemaType;
import com.zdjizhi.flink.voip.records.StreamDir;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
@@ -16,9 +15,6 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.Obje
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.util.Collector;
import java.util.Iterator;
import java.util.Map;
/**
* The `VoIPFusionFunction` is a key-based co-process function that performs VoIP fusion logic
* for SIP and RTP records. It combines SIP and RTP records belonging to the same session
@@ -28,21 +24,26 @@ import java.util.Map;
* @author chaoc
* @since 1.0
*/
public class VoIPFusionFunction extends KeyedCoProcessFunction<Integer, ObjectNode, ObjectNode, ObjectNode>
public class VoIPFusionFunction extends KeyedCoProcessFunction<Tuple2<Integer, Address>, ObjectNode, ObjectNode, ObjectNode>
implements FunctionHelper {
private static final int MAX_RTP_LINES = 2;
private transient Time fireInterval;
private transient MapState<Address, ObjectNodeInfo> sipDoubleDirState;
private transient MapState<Address, ObjectNodeInfo> rtpState;
private transient ValueState<ObjectNodeInfo> sipDoubleDirState;
private transient ValueState<ObjectNodeInfo> rtpState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
final int minutes = getGlobalConfiguration()
.get(FusionConfigs.RTP_STATE_CLEAR_INTERVAL);
final int minutes = getGlobalConfiguration().get(FusionConfigs.RTP_STATE_CLEAR_INTERVAL);
fireInterval = Time.minutes(minutes);
final RuntimeContext context = getRuntimeContext();
final ValueStateDescriptor<ObjectNodeInfo> sipDescriptor =
new ValueStateDescriptor<>("sip-state", ObjectNodeInfo.class);
sipDoubleDirState = context.getState(sipDescriptor);
final ValueStateDescriptor<ObjectNodeInfo> rtpDescriptor =
new ValueStateDescriptor<>("rtp-state", ObjectNodeInfo.class);
final StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(fireInterval)
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
@@ -50,90 +51,77 @@ public class VoIPFusionFunction extends KeyedCoProcessFunction<Integer, ObjectNo
.cleanupFullSnapshot()
.build();
final MapStateDescriptor<Address, ObjectNodeInfo> sipDescriptor =
new MapStateDescriptor<>("sip-state", Address.class, ObjectNodeInfo.class);
sipDescriptor.enableTimeToLive(ttlConfig);
sipDoubleDirState = context.getMapState(sipDescriptor);
final MapStateDescriptor<Address, ObjectNodeInfo> rtpDescriptor =
new MapStateDescriptor<>("rtp-state", Address.class, ObjectNodeInfo.class);
rtpDescriptor.enableTimeToLive(ttlConfig);
rtpState = context.getMapState(rtpDescriptor);
sipDescriptor.enableTimeToLive(ttlConfig);
rtpState = context.getState(rtpDescriptor);
}
@Override
public void processElement2(ObjectNode obj,
KeyedCoProcessFunction<Integer, ObjectNode, ObjectNode, ObjectNode>.Context ctx,
KeyedCoProcessFunction<Tuple2<Integer, Address>, ObjectNode, ObjectNode, ObjectNode>.Context ctx,
Collector<ObjectNode> out) throws Exception {
final SIPRecord record = new SIPRecord(obj);
final Address address = Address.of(
Tuple2.of(record.getOriginatorSdpConnectIp(), record.getOriginatorSdpMediaPort()),
Tuple2.of(record.getResponderSdpConnectIp(), record.getResponderSdpMediaPort()));
sipDoubleDirState.put(address, new ObjectNodeInfo(obj,
sipDoubleDirState.update(new ObjectNodeInfo(obj,
ctx.timerService().currentProcessingTime() + fireInterval.toMilliseconds()));
registerNextFireTimestamp(ctx.timerService(), ctx.getCurrentKey().hashCode());
}
@Override
public void processElement1(ObjectNode obj,
KeyedCoProcessFunction<Integer, ObjectNode, ObjectNode, ObjectNode>.Context ctx,
KeyedCoProcessFunction<Tuple2<Integer, Address>, ObjectNode, ObjectNode, ObjectNode>.Context ctx,
Collector<ObjectNode> out) throws Exception {
final Record record = new Record(obj);
final Address address = Address.of(Tuple2.of(record.getServerIp(), record.getServerPort()),
Tuple2.of(record.getClientIp(), record.getClientPort()));
if (sipDoubleDirState.contains(address)) {
final ObjectNodeInfo info = sipDoubleDirState.get(address);
final ObjectNodeInfo info = sipDoubleDirState.value();
if (null != info) {
record.merge(info.getObj())
.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue());
out.collect(obj);
// In the context of VoIP fusion, only one RTP double directional stream
if (record.getStreamDir() == StreamDir.DOUBLE.getValue()) {
sipDoubleDirState.remove(address);
if (record.getStreamDir() == StreamDir.DOUBLE.getValue()
|| info.getTimes() >= MAX_RTP_LINES - 1) {
sipDoubleDirState.clear();
} else { // Save the number of fused RTP unidirectional streams
info.setTimes(info.getTimes() + 1);
sipDoubleDirState.put(address, info);
sipDoubleDirState.update(info);
}
} else {
rtpState.put(address, new ObjectNodeInfo(obj,
ctx.timerService().currentProcessingTime() + fireInterval.toMilliseconds()));
rtpState.update(new ObjectNodeInfo(obj,
ctx.timerService()
.currentProcessingTime() + fireInterval.toMilliseconds()));
}
registerNextFireTimestamp(ctx.timerService(), ctx.getCurrentKey().hashCode());
}
@Override
public void onTimer(long timestamp,
KeyedCoProcessFunction<Integer, ObjectNode, ObjectNode, ObjectNode>.OnTimerContext ctx,
KeyedCoProcessFunction<Tuple2<Integer, Address>, ObjectNode, ObjectNode, ObjectNode>.OnTimerContext ctx,
Collector<ObjectNode> out) throws Exception {
final Iterator<Map.Entry<Address, ObjectNodeInfo>> iterator = rtpState.iterator();
while (iterator.hasNext()) {
final Map.Entry<Address, ObjectNodeInfo> entry = iterator.next();
final Address address = entry.getKey();
final ObjectNode obj = entry.getValue().getObj();
final ObjectNodeInfo rtpInfo = rtpState.value();
if (rtpInfo != null) {
final ObjectNode obj = rtpInfo.getObj();
final Record record = new Record(obj);
if (sipDoubleDirState.contains(address)) {
final ObjectNodeInfo info = sipDoubleDirState.get(address);
if (record.getStreamDir() == StreamDir.DOUBLE.getValue()
|| info.getTimes() >= MAX_RTP_LINES - 1
/* One RTP unidirectional stream has already been fused*/) {
sipDoubleDirState.remove(address);
}
record.merge(info.getObj())
.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue());
out.collect(obj);
record.merge(obj)
.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue());
out.collect(obj);
if (record.getStreamDir() == StreamDir.DOUBLE.getValue()
|| rtpInfo.getTimes() >= MAX_RTP_LINES - 1) {
sipDoubleDirState.clear();
} else { // Save the number of fused RTP unidirectional streams
rtpInfo.setTimes(rtpInfo.getTimes() + 1);
sipDoubleDirState.update(rtpInfo);
}
if (entry.getValue().getExpireTime() <= timestamp) {
out.collect(entry.getValue().getObj());
iterator.remove();
if (rtpInfo.getExpireTime() <= timestamp) {
rtpState.clear();
}
}
final Iterator<Map.Entry<Address, ObjectNodeInfo>> sipIterator = sipDoubleDirState.iterator();
while (sipIterator.hasNext()) {
final Map.Entry<Address, ObjectNodeInfo> entry = sipIterator.next();
if (entry.getValue().getExpireTime() <= timestamp) {
sipIterator.remove();
}
final ObjectNodeInfo sipInfo = sipDoubleDirState.value();
if (sipInfo != null && sipInfo.getExpireTime() <= timestamp) {
sipDoubleDirState.clear();
}
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + INTERVAL);
registerIntervalFireTimestamp(ctx.timerService());
}
}

View File

@@ -9,7 +9,7 @@ import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
@@ -33,7 +33,7 @@ public class SIPPairingFunctionTest {
@BeforeAll
static void setUp() throws Exception {
final SIPPairingFunction func = new SIPPairingFunction();
final KeyedProcessOperator<Tuple2<Integer, String>, ObjectNode, ObjectNode> operator =
final KeyedProcessOperator<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode> operator =
new KeyedProcessOperator<>(func);
final TypeInformation<Integer> type = TypeInformation.of(Integer.class);
final KeySelector<ObjectNode, Integer> keySelector = jsonNodes -> 0;