test(performance): add test case

This commit is contained in:
chaoc
2023-08-07 17:15:27 +08:00
parent 43dbc50a19
commit 33b9d61711
3 changed files with 161 additions and 0 deletions

View File

@@ -0,0 +1,95 @@
package com.zdjizhi.flink.voip;
import com.zdjizhi.flink.voip.functions.DataGenSource;
import com.zdjizhi.flink.voip.functions.SIPPairingFunction;
import com.zdjizhi.flink.voip.functions.TypeSplitFunction;
import com.zdjizhi.flink.voip.functions.VoIPFusionFunction;
import com.zdjizhi.flink.voip.records.Record;
import com.zdjizhi.flink.voip.records.SIPRecord;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.ttl.mock.MockStateBackend;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Objects;
// Integration test main class
public class FusionTest {
public static void main(String[] args) throws Exception {
final Configuration envConfig = new Configuration();
envConfig.setInteger("rest.port", 8081);
final StreamExecutionEnvironment env = StreamExecutionEnvironment
.createLocalEnvironmentWithWebUI(envConfig);
final ParameterTool tool = ParameterTool.fromPropertiesFile(
FusionTest.class.getResourceAsStream("/application-test.properties"));
final CheckpointConfig checkpointConfig = env.getCheckpointConfig();
env.enableCheckpointing(Time.minutes(1)
.toMilliseconds(), CheckpointingMode.AT_LEAST_ONCE);
checkpointConfig.setCheckpointTimeout(Time.minutes(1).toMilliseconds());
checkpointConfig.setCheckpointStorage(
Objects.requireNonNull(FusionTest.class.getResource("/")).toString());
final Configuration config = tool.getConfiguration();
env.getConfig().setGlobalJobParameters(config);
env.setParallelism(1);
final DataStreamSource<ObjectNode> sourceStream = env.addSource(new DataGenSource());
// Process the data using the TypeSplitFunction and split it into separate DataStreams for SIP and RTP data
final SingleOutputStreamOperator<ObjectNode> splitsRecordsOperator =
sourceStream
.process(new TypeSplitFunction())
.name("SplitsRecordsBasedSchemaType")
.uid("splits-records-based-schema-type");
// Get the DataStreams for SIP and RTP data from the side outputs.
final DataStream<ObjectNode> sipDataStream = splitsRecordsOperator
.getSideOutput(TypeSplitFunction.SIP_OUTPUT_TAG);
final DataStream<ObjectNode> rtpDataStream = splitsRecordsOperator
.getSideOutput(TypeSplitFunction.RTP_OUTPUT_TAG);
final KeySelector<ObjectNode, Tuple2<Integer, String>> sipKeySelector = new KeySelector<>() {
@Override
public Tuple2<Integer, String> getKey(ObjectNode obj) throws Exception {
final SIPRecord record = new SIPRecord(obj);
return Tuple2.of(record.getVSysID(), record.getCallID());
}
};
// Process SIP data to create a double directional stream using SIPPairingFunction.
final SingleOutputStreamOperator<ObjectNode> sipDoubleDirOperator = sipDataStream
.keyBy(sipKeySelector)
.process(new SIPPairingFunction())
.name("PairingOneWayToDoubleStream")
.uid("pairing-one-way-to-double");
final KeySelector<ObjectNode, Integer> vSysSelector = obj -> {
final Record record = new Record(obj);
return record.getVSysID();
};
// Fusion SIP data and RTP data to VoIP data.
final SingleOutputStreamOperator<ObjectNode> voIpOperator = rtpDataStream
.keyBy(vSysSelector)
.connect(sipDoubleDirOperator.keyBy(vSysSelector))
.process(new VoIPFusionFunction())
.name("VoIPFusion")
.uid("voip-fusion");
voIpOperator.print();
env.execute("VoIP Fusion Job");
}
}

View File

@@ -0,0 +1,22 @@
package com.zdjizhi.flink.voip.conf;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
// Test Configs.
public class TestConfigs {
// Ratio of valuable data in the generated dataset
public static final ConfigOption<Integer> VALUABLE_DATA_PROPORTION =
ConfigOptions.key("valuable.data.ratio")
.intType()
.defaultValue(40)
.withDescription("Ratio of valuable data in the generated dataset.");
// QPS of generate date record
public static final ConfigOption<Long> DATA_GENERATE_RATE =
ConfigOptions.key("data.generate.rate")
.longType()
.defaultValue(1000L)
.withDescription("QPS of generate date record.");
}

View File

@@ -0,0 +1,44 @@
package com.zdjizhi.flink.voip.functions;
import com.zdjizhi.flink.voip.conf.TestConfigs;
import com.zdjizhi.flink.voip.data.RTPGenerator;
import com.zdjizhi.flink.voip.data.SIPGenerator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.RateLimiter;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
// Date Generate Source.
public class DataGenSource extends RichSourceFunction<ObjectNode> implements FunctionHelper {
private transient SIPGenerator sipGenerator;
private transient RTPGenerator rtpGenerator;
private transient RateLimiter rateLimiter;
private volatile boolean running;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Integer ratio = getGlobalConfiguration()
.get(TestConfigs.VALUABLE_DATA_PROPORTION);
this.sipGenerator = new SIPGenerator(ratio);
this.rtpGenerator = new RTPGenerator(ratio, sipGenerator);
this.rateLimiter = RateLimiter.create(getGlobalConfiguration()
.get(TestConfigs.DATA_GENERATE_RATE));
this.running = true;
}
@Override
public void run(SourceContext<ObjectNode> ctx) throws Exception {
while (running) {
rateLimiter.acquire();
ctx.collect(sipGenerator.next());
ctx.collect(rtpGenerator.next());
}
}
@Override
public void cancel() {
this.running = false;
}
}