diff --git a/src/it/java/com/zdjizhi/flink/voip/FusionTest.java b/src/it/java/com/zdjizhi/flink/voip/FusionTest.java index af17682..7303679 100644 --- a/src/it/java/com/zdjizhi/flink/voip/FusionTest.java +++ b/src/it/java/com/zdjizhi/flink/voip/FusionTest.java @@ -78,7 +78,9 @@ public class FusionTest { .name("VoIPFusion") .uid("voip-fusion"); - voIpOperator.print(); + voIpOperator.addSink(new DoNothingSink()) + .name("DoNothingSink") + .setParallelism(1); env.execute("VoIP Fusion Job"); } diff --git a/src/it/java/com/zdjizhi/flink/voip/functions/DoNothingSink.java b/src/it/java/com/zdjizhi/flink/voip/functions/DoNothingSink.java new file mode 100644 index 0000000..6877f83 --- /dev/null +++ b/src/it/java/com/zdjizhi/flink/voip/functions/DoNothingSink.java @@ -0,0 +1,44 @@ +package com.zdjizhi.flink.voip.functions; + +import com.zdjizhi.flink.voip.records.Record; +import com.zdjizhi.flink.voip.records.SchemaType; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.MeterView; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; + + +// It dose nothing with the incoming data and simply collects metrics for the number of +// RTP and VoIP records processed per second. +public class DoNothingSink extends RichSinkFunction { + + private transient MeterView numRTPRecordsPreSecond; + private transient MeterView numVoIPRecordsPreSecond; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + RuntimeContext runtimeContext = getRuntimeContext(); + MetricGroup metricGroup = runtimeContext.getMetricGroup(); + numRTPRecordsPreSecond = metricGroup + .meter("numRTPRecordsPreSecond", new MeterView(1)); + numVoIPRecordsPreSecond = metricGroup + .meter("numVoIPRecordsPreSecond", new MeterView(1)); + } + + @Override + public void invoke(ObjectNode obj, Context context) throws Exception { + Record record = new Record(obj); + switch (SchemaType.of(record.getSchemaType())) { + case RTP: + numRTPRecordsPreSecond.markEvent(); + break; + case VOIP: + numVoIPRecordsPreSecond.markEvent(); + break; + default: + } + } +}