diff --git a/src/it/java/com/zdjizhi/flink/voip/FusionTest.java b/src/it/java/com/zdjizhi/flink/voip/FusionTest.java new file mode 100644 index 0000000..640cbe8 --- /dev/null +++ b/src/it/java/com/zdjizhi/flink/voip/FusionTest.java @@ -0,0 +1,95 @@ +package com.zdjizhi.flink.voip; + +import com.zdjizhi.flink.voip.functions.DataGenSource; +import com.zdjizhi.flink.voip.functions.SIPPairingFunction; +import com.zdjizhi.flink.voip.functions.TypeSplitFunction; +import com.zdjizhi.flink.voip.functions.VoIPFusionFunction; +import com.zdjizhi.flink.voip.records.Record; +import com.zdjizhi.flink.voip.records.SIPRecord; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.ttl.mock.MockStateBackend; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.util.Objects; + +// Integration test main class +public class FusionTest { + + public static void main(String[] args) throws Exception { + final Configuration envConfig = new Configuration(); + envConfig.setInteger("rest.port", 8081); + final StreamExecutionEnvironment env = StreamExecutionEnvironment + .createLocalEnvironmentWithWebUI(envConfig); + + final ParameterTool tool = ParameterTool.fromPropertiesFile( + FusionTest.class.getResourceAsStream("/application-test.properties")); + + final CheckpointConfig checkpointConfig = env.getCheckpointConfig(); + env.enableCheckpointing(Time.minutes(1) + .toMilliseconds(), CheckpointingMode.AT_LEAST_ONCE); + checkpointConfig.setCheckpointTimeout(Time.minutes(1).toMilliseconds()); + checkpointConfig.setCheckpointStorage( + Objects.requireNonNull(FusionTest.class.getResource("/")).toString()); + + final Configuration config = tool.getConfiguration(); + env.getConfig().setGlobalJobParameters(config); + env.setParallelism(1); + + final DataStreamSource sourceStream = env.addSource(new DataGenSource()); + + // Process the data using the TypeSplitFunction and split it into separate DataStreams for SIP and RTP data + final SingleOutputStreamOperator splitsRecordsOperator = + sourceStream + .process(new TypeSplitFunction()) + .name("SplitsRecordsBasedSchemaType") + .uid("splits-records-based-schema-type"); + + // Get the DataStreams for SIP and RTP data from the side outputs. + final DataStream sipDataStream = splitsRecordsOperator + .getSideOutput(TypeSplitFunction.SIP_OUTPUT_TAG); + + final DataStream rtpDataStream = splitsRecordsOperator + .getSideOutput(TypeSplitFunction.RTP_OUTPUT_TAG); + + final KeySelector> sipKeySelector = new KeySelector<>() { + @Override + public Tuple2 getKey(ObjectNode obj) throws Exception { + final SIPRecord record = new SIPRecord(obj); + return Tuple2.of(record.getVSysID(), record.getCallID()); + } + }; + // Process SIP data to create a double directional stream using SIPPairingFunction. + final SingleOutputStreamOperator sipDoubleDirOperator = sipDataStream + .keyBy(sipKeySelector) + .process(new SIPPairingFunction()) + .name("PairingOneWayToDoubleStream") + .uid("pairing-one-way-to-double"); + + final KeySelector vSysSelector = obj -> { + final Record record = new Record(obj); + return record.getVSysID(); + }; + + // Fusion SIP data and RTP data to VoIP data. + final SingleOutputStreamOperator voIpOperator = rtpDataStream + .keyBy(vSysSelector) + .connect(sipDoubleDirOperator.keyBy(vSysSelector)) + .process(new VoIPFusionFunction()) + .name("VoIPFusion") + .uid("voip-fusion"); + + voIpOperator.print(); + + env.execute("VoIP Fusion Job"); + } +} diff --git a/src/it/java/com/zdjizhi/flink/voip/conf/TestConfigs.java b/src/it/java/com/zdjizhi/flink/voip/conf/TestConfigs.java new file mode 100644 index 0000000..f1b1b6c --- /dev/null +++ b/src/it/java/com/zdjizhi/flink/voip/conf/TestConfigs.java @@ -0,0 +1,22 @@ +package com.zdjizhi.flink.voip.conf; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +// Test Configs. +public class TestConfigs { + + // Ratio of valuable data in the generated dataset + public static final ConfigOption VALUABLE_DATA_PROPORTION = + ConfigOptions.key("valuable.data.ratio") + .intType() + .defaultValue(40) + .withDescription("Ratio of valuable data in the generated dataset."); + + // QPS of generate date record + public static final ConfigOption DATA_GENERATE_RATE = + ConfigOptions.key("data.generate.rate") + .longType() + .defaultValue(1000L) + .withDescription("QPS of generate date record."); +} diff --git a/src/it/java/com/zdjizhi/flink/voip/functions/DataGenSource.java b/src/it/java/com/zdjizhi/flink/voip/functions/DataGenSource.java new file mode 100644 index 0000000..a0a6e61 --- /dev/null +++ b/src/it/java/com/zdjizhi/flink/voip/functions/DataGenSource.java @@ -0,0 +1,44 @@ +package com.zdjizhi.flink.voip.functions; + +import com.zdjizhi.flink.voip.conf.TestConfigs; +import com.zdjizhi.flink.voip.data.RTPGenerator; +import com.zdjizhi.flink.voip.data.SIPGenerator; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.RateLimiter; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; + +// Date Generate Source. +public class DataGenSource extends RichSourceFunction implements FunctionHelper { + + private transient SIPGenerator sipGenerator; + private transient RTPGenerator rtpGenerator; + private transient RateLimiter rateLimiter; + private volatile boolean running; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + Integer ratio = getGlobalConfiguration() + .get(TestConfigs.VALUABLE_DATA_PROPORTION); + this.sipGenerator = new SIPGenerator(ratio); + this.rtpGenerator = new RTPGenerator(ratio, sipGenerator); + this.rateLimiter = RateLimiter.create(getGlobalConfiguration() + .get(TestConfigs.DATA_GENERATE_RATE)); + this.running = true; + } + + @Override + public void run(SourceContext ctx) throws Exception { + while (running) { + rateLimiter.acquire(); + ctx.collect(sipGenerator.next()); + ctx.collect(rtpGenerator.next()); + } + } + + @Override + public void cancel() { + this.running = false; + } +}