perf: code optimization

This commit is contained in:
chaoc
2023-08-14 10:51:42 +08:00
parent da69688270
commit 6a2e5bacb9
2 changed files with 94 additions and 73 deletions

View File

@@ -25,15 +25,15 @@ public class SIPPairingFunction extends KeyedProcessFunction<Tuple3<Integer, Str
private transient Time fireInterval;
private transient ValueState<ObjectNodeInfo> valueState;
private transient ValueState<ObjectNode> valueState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
int minutes = getGlobalConfiguration().get(FusionConfigs.SIP_STATE_CLEAR_INTERVAL);
fireInterval = Time.minutes(minutes);
final ValueStateDescriptor<ObjectNodeInfo> descriptor =
new ValueStateDescriptor<>("sip-state", ObjectNodeInfo.class);
final ValueStateDescriptor<ObjectNode> descriptor =
new ValueStateDescriptor<>("sip-state", ObjectNode.class);
final StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(fireInterval)
@@ -53,38 +53,30 @@ public class SIPPairingFunction extends KeyedProcessFunction<Tuple3<Integer, Str
final Record record = new Record(value);
// When SIP is a one-way stream.
if (StreamDir.DOUBLE.getValue() != record.getStreamDir()) {
if (StreamDir.DOUBLE != record.getStreamDir()) {
// If the address is already stored in the mapState and has the opposite stream direction,
// merge the SIP records, change the stream direction to DOUBLE, and output the merged record.
final ObjectNodeInfo info = valueState.value();
if (null != info && new Record(info.getObj()).getStreamDir() != record.getStreamDir()) {
record.merge(info.getObj());
record.setInt(Record.F_COMMON_STREAM_DIR, StreamDir.DOUBLE.getValue());
final ObjectNode obj = valueState.value();
if (null != obj && new Record(obj).getStreamDir() != record.getStreamDir()) {
record.merge(obj)
.setInt(Record.F_COMMON_STREAM_DIR, StreamDir.DOUBLE.getValue());
out.collect(value);
valueState.clear();
} else {
// If the address is not yet in the mapState, add it with its expiration time.
final ObjectNodeInfo nodeInfo = new ObjectNodeInfo(value,
ctx.timerService().currentProcessingTime() + fireInterval.toMilliseconds());
valueState.update(nodeInfo);
// If the address is not yet in the mapState.
valueState.update(value);
}
} else {
// If SIP is a double stream, pairing isn't required, directly output the record.
out.collect(value);
}
registerNextFireTimestamp(ctx.timerService(), ctx.getCurrentKey().hashCode());
registerNextFireTimestamp(ctx.timerService(), fireInterval.toMilliseconds());
}
@Override
public void onTimer(long timestamp,
KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode>.OnTimerContext ctx,
Collector<ObjectNode> out) throws Exception {
final ObjectNodeInfo info = valueState.value();
if (info != null) {
if (info.getExpireTime() <= timestamp) {
valueState.clear();
}
registerIntervalFireTimestamp(ctx.timerService());
}
valueState.clear();
}
}

View File

@@ -5,9 +5,7 @@ import com.zdjizhi.flink.voip.records.Record;
import com.zdjizhi.flink.voip.records.SchemaType;
import com.zdjizhi.flink.voip.records.StreamDir;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
@@ -15,6 +13,9 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.Obje
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.util.Collector;
import java.util.Iterator;
import java.util.Map;
/**
* The `VoIPFusionFunction` is a key-based co-process function that performs VoIP fusion logic
* for SIP and RTP records. It combines SIP and RTP records belonging to the same session
@@ -29,8 +30,8 @@ public class VoIPFusionFunction extends KeyedCoProcessFunction<Tuple2<Integer, A
private static final int MAX_RTP_LINES = 2;
private transient Time fireInterval;
private transient ValueState<ObjectNodeInfo> sipDoubleDirState;
private transient ValueState<ObjectNodeInfo> rtpState;
private transient ValueState<ObjectNodeInfo> sipState;
private transient MapState<StreamDir, ObjectNode> rtpState;
@Override
public void open(Configuration parameters) throws Exception {
@@ -41,9 +42,6 @@ public class VoIPFusionFunction extends KeyedCoProcessFunction<Tuple2<Integer, A
final ValueStateDescriptor<ObjectNodeInfo> sipDescriptor =
new ValueStateDescriptor<>("sip-state", ObjectNodeInfo.class);
final ValueStateDescriptor<ObjectNodeInfo> rtpDescriptor =
new ValueStateDescriptor<>("rtp-state", ObjectNodeInfo.class);
final StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(fireInterval)
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
@@ -51,78 +49,109 @@ public class VoIPFusionFunction extends KeyedCoProcessFunction<Tuple2<Integer, A
.useProcessingTime()
.cleanupFullSnapshot()
.build();
sipDescriptor.enableTimeToLive(ttlConfig);
sipState = context.getState(sipDescriptor);
rtpState = context.getState(rtpDescriptor);
sipDoubleDirState = context.getState(sipDescriptor);
final MapStateDescriptor<StreamDir, ObjectNode> rtpDescriptor =
new MapStateDescriptor<>("rtp-state", StreamDir.class, ObjectNode.class);
final StateTtlConfig rtpTtlConfig = StateTtlConfig
.newBuilder(Time.minutes(minutes + 1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.useProcessingTime()
.cleanupFullSnapshot()
.build();
rtpDescriptor.enableTimeToLive(rtpTtlConfig);
rtpState = context.getMapState(rtpDescriptor);
}
// SIP
@Override
public void processElement2(ObjectNode obj,
public void processElement2(ObjectNode sipObj,
KeyedCoProcessFunction<Tuple2<Integer, Address>, ObjectNode, ObjectNode, ObjectNode>.Context ctx,
Collector<ObjectNode> out) throws Exception {
final ObjectNodeInfo info = rtpState.value();
if (info != null) {
final Record record = new Record(info.getObj());
record.merge(obj)
.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue());
out.collect(info.getObj());
if (record.getStreamDir() != StreamDir.DOUBLE.getValue()) {
sipDoubleDirState.update(new ObjectNodeInfo(obj,
ctx.timerService().currentProcessingTime() + fireInterval.toMilliseconds()));
}
} else {
sipDoubleDirState.update(new ObjectNodeInfo(obj,
ctx.timerService().currentProcessingTime() + fireInterval.toMilliseconds()));
final Iterator<Map.Entry<StreamDir, ObjectNode>> iterator = rtpState.iterator();
if (rtpState.isEmpty()) {
sipState.update(new ObjectNodeInfo(sipObj, 0));
}
registerNextFireTimestamp(ctx.timerService(), ctx.getCurrentKey().hashCode());
while (iterator.hasNext()) {
final Map.Entry<StreamDir, ObjectNode> entry = iterator.next();
final ObjectNode rtpObj = entry.getValue();
final Record rtpRecord = new Record(rtpObj);
rtpRecord.merge(sipObj)
.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue());
out.collect(rtpObj);
switch (entry.getKey()) {
case S2C:
case C2S:
ObjectNodeInfo info = sipState.value();
if (info != null) {
info.incTimes();
if (info.getTimes() >= MAX_RTP_LINES) {
sipState.clear();
} else {
sipState.update(new ObjectNodeInfo(sipObj, info.getTimes()));
}
} else {
sipState.update(new ObjectNodeInfo(sipObj, 1));
}
break;
default:
// Double directional:
// In the context of VoIP fusion, only one RTP double directional stream
sipState.clear();
}
}
registerNextFireTimestamp(ctx.timerService(), fireInterval.toMilliseconds());
}
// RTP
@Override
public void processElement1(ObjectNode obj,
public void processElement1(ObjectNode rtpObj,
KeyedCoProcessFunction<Tuple2<Integer, Address>, ObjectNode, ObjectNode, ObjectNode>.Context ctx,
Collector<ObjectNode> out) throws Exception {
final Record record = new Record(obj);
final ObjectNodeInfo info = sipDoubleDirState.value();
final Record rtpRecord = new Record(rtpObj);
final ObjectNodeInfo info = sipState.value();
final StreamDir streamDir = rtpRecord.getStreamDir();
if (null != info) {
record.merge(info.getObj())
rtpRecord.merge(info.getObj())
.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue());
out.collect(obj);
// In the context of VoIP fusion, only one RTP double directional stream
if (record.getStreamDir() == StreamDir.DOUBLE.getValue()
|| info.getTimes() >= MAX_RTP_LINES - 1) {
sipDoubleDirState.clear();
} else { // Save the number of fused RTP unidirectional streams
info.setTimes(info.getTimes() + 1);
sipDoubleDirState.update(info);
out.collect(rtpObj);
switch (streamDir) {
case C2S:
case S2C:
info.incTimes();
if (info.getTimes() >= MAX_RTP_LINES) {
sipState.clear();
}
break;
default:
// Double
sipState.clear();
}
} else {
rtpState.update(new ObjectNodeInfo(obj,
ctx.timerService()
.currentProcessingTime() + fireInterval.toMilliseconds()));
rtpState.put(streamDir, rtpObj);
}
registerNextFireTimestamp(ctx.timerService(), ctx.getCurrentKey().hashCode());
registerNextFireTimestamp(ctx.timerService(), fireInterval.toMilliseconds());
}
@Override
public void onTimer(long timestamp,
KeyedCoProcessFunction<Tuple2<Integer, Address>, ObjectNode, ObjectNode, ObjectNode>.OnTimerContext ctx,
Collector<ObjectNode> out) throws Exception {
final ObjectNodeInfo rtpInfo = rtpState.value();
if (rtpInfo != null && rtpInfo.getExpireTime() <= timestamp) {
out.collect(rtpInfo.getObj());
rtpState.clear();
}
final ObjectNodeInfo sipInfo = sipDoubleDirState.value();
if (sipInfo != null && sipInfo.getExpireTime() <= timestamp) {
sipDoubleDirState.clear();
}
if (rtpInfo != null || sipInfo != null) {
registerIntervalFireTimestamp(ctx.timerService());
for (ObjectNode obj : rtpState.values()) {
out.collect(obj);
}
rtpState.clear();
sipState.clear();
}
}