style(performance): organize performance testing code
This commit is contained in:
@@ -1,17 +1,11 @@
|
||||
package com.zdjizhi.flink.voip;
|
||||
|
||||
import com.zdjizhi.flink.voip.functions.DataGenSource;
|
||||
import com.zdjizhi.flink.voip.functions.SIPPairingFunction;
|
||||
import com.zdjizhi.flink.voip.functions.TypeSplitFunction;
|
||||
import com.zdjizhi.flink.voip.functions.VoIPFusionFunction;
|
||||
import com.zdjizhi.flink.voip.records.Record;
|
||||
import com.zdjizhi.flink.voip.records.SIPRecord;
|
||||
import com.zdjizhi.flink.voip.functions.*;
|
||||
import org.apache.flink.api.common.time.Time;
|
||||
import org.apache.flink.api.java.functions.KeySelector;
|
||||
import org.apache.flink.api.java.tuple.Tuple2;
|
||||
import org.apache.flink.api.java.utils.ParameterTool;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.runtime.state.ttl.mock.MockStateBackend;
|
||||
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import org.apache.flink.streaming.api.CheckpointingMode;
|
||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||
@@ -27,29 +21,35 @@ public class FusionTest {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
final Configuration envConfig = new Configuration();
|
||||
envConfig.setInteger("rest.port", 8081);
|
||||
envConfig.setInteger("rest.port", 18081);
|
||||
final StreamExecutionEnvironment env = StreamExecutionEnvironment
|
||||
.createLocalEnvironmentWithWebUI(envConfig);
|
||||
|
||||
final ParameterTool tool = ParameterTool.fromPropertiesFile(
|
||||
FusionTest.class.getResourceAsStream("/application-test.properties"));
|
||||
args.length > 0 ? args[0] :
|
||||
Objects.requireNonNull(
|
||||
FusionTest.class.getResource("/application-test.properties")).getPath()
|
||||
);
|
||||
|
||||
final CheckpointConfig checkpointConfig = env.getCheckpointConfig();
|
||||
env.enableCheckpointing(Time.minutes(1)
|
||||
.toMilliseconds(), CheckpointingMode.AT_LEAST_ONCE);
|
||||
checkpointConfig.setCheckpointTimeout(Time.minutes(1).toMilliseconds());
|
||||
checkpointConfig.setCheckpointStorage(
|
||||
Objects.requireNonNull(FusionTest.class.getResource("/")).toString());
|
||||
checkpointConfig.setCheckpointTimeout(Time.minutes(2).toMilliseconds());
|
||||
checkpointConfig.setCheckpointStorage("file://" + System.getProperty("java.io.tmpdir"));
|
||||
/*checkpointConfig.setCheckpointStorage(
|
||||
Objects.requireNonNull(
|
||||
FusionTest.class.getResource("/")).toString());*/
|
||||
|
||||
final Configuration config = tool.getConfiguration();
|
||||
env.getConfig().setGlobalJobParameters(config);
|
||||
env.setParallelism(1);
|
||||
env.setParallelism(8);
|
||||
|
||||
final DataStreamSource<ObjectNode> sourceStream = env.addSource(new DataGenSource());
|
||||
|
||||
// Process the data using the TypeSplitFunction and split it into separate DataStreams for SIP and RTP data
|
||||
final SingleOutputStreamOperator<ObjectNode> splitsRecordsOperator =
|
||||
sourceStream
|
||||
.name("DataGenSource")
|
||||
.process(new TypeSplitFunction())
|
||||
.name("SplitsRecordsBasedSchemaType")
|
||||
.uid("splits-records-based-schema-type");
|
||||
@@ -61,24 +61,14 @@ public class FusionTest {
|
||||
final DataStream<ObjectNode> rtpDataStream = splitsRecordsOperator
|
||||
.getSideOutput(TypeSplitFunction.RTP_OUTPUT_TAG);
|
||||
|
||||
final KeySelector<ObjectNode, Tuple2<Integer, String>> sipKeySelector = new KeySelector<>() {
|
||||
@Override
|
||||
public Tuple2<Integer, String> getKey(ObjectNode obj) throws Exception {
|
||||
final SIPRecord record = new SIPRecord(obj);
|
||||
return Tuple2.of(record.getVSysID(), record.getCallID());
|
||||
}
|
||||
};
|
||||
// Process SIP data to create a double directional stream using SIPPairingFunction.
|
||||
final SingleOutputStreamOperator<ObjectNode> sipDoubleDirOperator = sipDataStream
|
||||
.keyBy(sipKeySelector)
|
||||
.keyBy(new SIPKeySelector())
|
||||
.process(new SIPPairingFunction())
|
||||
.name("PairingOneWayToDoubleStream")
|
||||
.uid("pairing-one-way-to-double");
|
||||
|
||||
final KeySelector<ObjectNode, Integer> vSysSelector = obj -> {
|
||||
final Record record = new Record(obj);
|
||||
return record.getVSysID();
|
||||
};
|
||||
final KeySelector<ObjectNode, Tuple2<Integer, Address>> vSysSelector = new VSysIDKeySelector();
|
||||
|
||||
// Fusion SIP data and RTP data to VoIP data.
|
||||
final SingleOutputStreamOperator<ObjectNode> voIpOperator = rtpDataStream
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
sip.state.clear.interval.minutes=1
|
||||
rtp.state.clear.interval.minutes=10
|
||||
valuable.data.ratio=20
|
||||
data.generate.rate=10000
|
||||
data.generate.rate=1000
|
||||
Reference in New Issue
Block a user