diff --git a/src/it/java/com/zdjizhi/flink/voip/FusionTest.java b/src/it/java/com/zdjizhi/flink/voip/FusionTest.java index 640cbe8..af17682 100644 --- a/src/it/java/com/zdjizhi/flink/voip/FusionTest.java +++ b/src/it/java/com/zdjizhi/flink/voip/FusionTest.java @@ -1,17 +1,11 @@ package com.zdjizhi.flink.voip; -import com.zdjizhi.flink.voip.functions.DataGenSource; -import com.zdjizhi.flink.voip.functions.SIPPairingFunction; -import com.zdjizhi.flink.voip.functions.TypeSplitFunction; -import com.zdjizhi.flink.voip.functions.VoIPFusionFunction; -import com.zdjizhi.flink.voip.records.Record; -import com.zdjizhi.flink.voip.records.SIPRecord; +import com.zdjizhi.flink.voip.functions.*; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.state.ttl.mock.MockStateBackend; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; @@ -27,29 +21,35 @@ public class FusionTest { public static void main(String[] args) throws Exception { final Configuration envConfig = new Configuration(); - envConfig.setInteger("rest.port", 8081); + envConfig.setInteger("rest.port", 18081); final StreamExecutionEnvironment env = StreamExecutionEnvironment .createLocalEnvironmentWithWebUI(envConfig); final ParameterTool tool = ParameterTool.fromPropertiesFile( - FusionTest.class.getResourceAsStream("/application-test.properties")); + args.length > 0 ? args[0] : + Objects.requireNonNull( + FusionTest.class.getResource("/application-test.properties")).getPath() + ); final CheckpointConfig checkpointConfig = env.getCheckpointConfig(); env.enableCheckpointing(Time.minutes(1) .toMilliseconds(), CheckpointingMode.AT_LEAST_ONCE); - checkpointConfig.setCheckpointTimeout(Time.minutes(1).toMilliseconds()); - checkpointConfig.setCheckpointStorage( - Objects.requireNonNull(FusionTest.class.getResource("/")).toString()); + checkpointConfig.setCheckpointTimeout(Time.minutes(2).toMilliseconds()); + checkpointConfig.setCheckpointStorage("file://" + System.getProperty("java.io.tmpdir")); + /*checkpointConfig.setCheckpointStorage( + Objects.requireNonNull( + FusionTest.class.getResource("/")).toString());*/ final Configuration config = tool.getConfiguration(); env.getConfig().setGlobalJobParameters(config); - env.setParallelism(1); + env.setParallelism(8); final DataStreamSource sourceStream = env.addSource(new DataGenSource()); // Process the data using the TypeSplitFunction and split it into separate DataStreams for SIP and RTP data final SingleOutputStreamOperator splitsRecordsOperator = sourceStream + .name("DataGenSource") .process(new TypeSplitFunction()) .name("SplitsRecordsBasedSchemaType") .uid("splits-records-based-schema-type"); @@ -61,24 +61,14 @@ public class FusionTest { final DataStream rtpDataStream = splitsRecordsOperator .getSideOutput(TypeSplitFunction.RTP_OUTPUT_TAG); - final KeySelector> sipKeySelector = new KeySelector<>() { - @Override - public Tuple2 getKey(ObjectNode obj) throws Exception { - final SIPRecord record = new SIPRecord(obj); - return Tuple2.of(record.getVSysID(), record.getCallID()); - } - }; // Process SIP data to create a double directional stream using SIPPairingFunction. final SingleOutputStreamOperator sipDoubleDirOperator = sipDataStream - .keyBy(sipKeySelector) + .keyBy(new SIPKeySelector()) .process(new SIPPairingFunction()) .name("PairingOneWayToDoubleStream") .uid("pairing-one-way-to-double"); - final KeySelector vSysSelector = obj -> { - final Record record = new Record(obj); - return record.getVSysID(); - }; + final KeySelector> vSysSelector = new VSysIDKeySelector(); // Fusion SIP data and RTP data to VoIP data. final SingleOutputStreamOperator voIpOperator = rtpDataStream diff --git a/src/it/resources/application-test.properties b/src/it/resources/application-test.properties index 974a595..8d510b3 100644 --- a/src/it/resources/application-test.properties +++ b/src/it/resources/application-test.properties @@ -1,4 +1,4 @@ sip.state.clear.interval.minutes=1 rtp.state.clear.interval.minutes=10 valuable.data.ratio=20 -data.generate.rate=10000 \ No newline at end of file +data.generate.rate=1000 \ No newline at end of file