diff --git a/src/test/java/com/zdjizhi/FileChunkCombinerTests.java b/src/test/java/com/zdjizhi/FileChunkCombinerTests.java index 2cbefba..d3d30c4 100644 --- a/src/test/java/com/zdjizhi/FileChunkCombinerTests.java +++ b/src/test/java/com/zdjizhi/FileChunkCombinerTests.java @@ -5,6 +5,8 @@ import cn.hutool.core.util.ArrayUtil; import cn.hutool.core.util.RandomUtil; import com.zdjizhi.config.Configs; import com.zdjizhi.function.*; +import com.zdjizhi.function.map.ParseMessagePackMapFunction; +import com.zdjizhi.function.map.SideOutputMapFunction; import com.zdjizhi.pojo.FileChunk; import com.zdjizhi.sink.HBaseSink; import com.zdjizhi.sink.HosSink; @@ -49,6 +51,8 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import java.io.*; +import java.nio.file.Files; +import java.nio.file.Paths; import java.time.Duration; import java.util.*; import java.util.concurrent.ConcurrentLinkedQueue; @@ -66,9 +70,9 @@ public class FileChunkCombinerTests { private List pcapngFileChunks; private List pcapngIncludeMetaFileChunks; private Map pcapngFileMeta; - private int emlChunkCount = 10; - private int pcapngChunkCount = 10; - private String pcapChunkData = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + private final int emlChunkCount = 10; + private final int pcapngChunkCount = 10; + private final String pcapChunkData = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; private static Configuration configuration; private CombineChunkProcessWindowFunction processWindowFunction; private OutputTag delayedChunkOutputTag; @@ -93,13 +97,13 @@ public class FileChunkCombinerTests { emlFileChunks = new ArrayList<>(); pcapngFileChunks = new ArrayList<>(); pcapngIncludeMetaFileChunks = new ArrayList<>(); - ObjectInputStream messagePacksInputStream = new ObjectInputStream(new FileInputStream("src" + File.separator + "test" + File.separator + "data" + File.separator + "messagePacks")); + ObjectInputStream messagePacksInputStream = new ObjectInputStream(Files.newInputStream(Paths.get("src" + File.separator + "test" + File.separator + "data" + File.separator + "messagePacks"))); messagePackList = (List) messagePacksInputStream.readObject(); messagePacksInputStream.close(); - ObjectInputStream fileChunksInputStream = new ObjectInputStream(new FileInputStream("src" + File.separator + "test" + File.separator + "data" + File.separator + "fileChunks")); + ObjectInputStream fileChunksInputStream = new ObjectInputStream(Files.newInputStream(Paths.get("src" + File.separator + "test" + File.separator + "data" + File.separator + "fileChunks"))); inputFileChunks = (List) fileChunksInputStream.readObject(); fileChunksInputStream.close(); - ObjectInputStream filesInputStream = new ObjectInputStream(new FileInputStream("src" + File.separator + "test" + File.separator + "data" + File.separator + "files")); + ObjectInputStream filesInputStream = new ObjectInputStream(Files.newInputStream(Paths.get("src" + File.separator + "test" + File.separator + "data" + File.separator + "files"))); inputFiles = (List) filesInputStream.readObject(); filesInputStream.close(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -125,7 +129,7 @@ public class FileChunkCombinerTests { @Test public void testParseMessagePackMapFunction() throws Exception { - ParseMessagePackMapFunction mapFunction = new ParseMessagePackMapFunction(false, Long.MAX_VALUE, ""); + ParseMessagePackMapFunction mapFunction = new ParseMessagePackMapFunction(); OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness<>(new StreamMap<>(mapFunction)); testHarness.setup(); testHarness.open(); @@ -143,12 +147,12 @@ public class FileChunkCombinerTests { StreamRecord sr1 = (StreamRecord) o2; return ((FileChunk) sr0.getValue()).getUuid().compareTo(((FileChunk) sr1.getValue()).getUuid()); }); - Assert.assertEquals(30, mapFunction.parseMessagePacksCounter.getCount()); - Assert.assertEquals(0, mapFunction.parseErrorMessagePacksCounter.getCount()); - Assert.assertEquals(0, mapFunction.rateLimitDropChunksCounter.getCount()); - Assert.assertEquals(21, mapFunction.equal0BChunksCounter.getCount()); - Assert.assertEquals(1, mapFunction.lessThan1KBChunksCounter.getCount()); - Assert.assertEquals(8, mapFunction.between1KBAnd5KBChunksCounter.getCount()); + Assert.assertEquals(30, mapFunction.chunksInCounter.getCount()); + Assert.assertEquals(30, mapFunction.chunksOutCounter.getCount()); + Assert.assertEquals(0, mapFunction.errorChunksCounter.getCount()); + Assert.assertEquals(22, mapFunction.lessThan1KBChunksCounter.getCount()); + Assert.assertEquals(8, mapFunction.between1KBAnd3KBChunksCounter.getCount()); + Assert.assertEquals(0, mapFunction.between3KBAnd5KBChunksCounter.getCount()); Assert.assertEquals(0, mapFunction.between5KBAnd10KBChunksCounter.getCount()); Assert.assertEquals(0, mapFunction.between10KBAnd50KBChunksCounter.getCount()); Assert.assertEquals(0, mapFunction.between50KBAnd100KBChunksCounter.getCount()); @@ -191,7 +195,7 @@ public class FileChunkCombinerTests { @Test public void testFileChunkFilterFunction() throws Exception { - FileChunkFilterFunction fileChunkFilterFunction = new FileChunkFilterFunction(Long.MAX_VALUE, "FileChunk.fileType == \"eml\""); + FileChunkFilterFunction fileChunkFilterFunction = new FileChunkFilterFunction("FileChunk.fileType == \"eml\"", "test"); StreamFilter fileChunkStreamFilter = new StreamFilter<>(fileChunkFilterFunction); OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness<>(fileChunkStreamFilter); testHarness.setup(); @@ -232,7 +236,7 @@ public class FileChunkCombinerTests { List actualOutput = new ArrayList<>(testHarness.extractOutputValues()); Assert.assertEquals(3, actualOutput.size()); TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, actualOutput, Comparator.comparing(o -> ((FileChunk) o).getUuid())); - Assert.assertEquals(0, processWindowFunction.combineErrorChunksCounter.getCount()); + Assert.assertEquals(0, processWindowFunction.errorChunksCounter.getCount()); Assert.assertEquals(0, processWindowFunction.duplicateChunksCounter.getCount()); testHarness.close(); } @@ -266,7 +270,7 @@ public class FileChunkCombinerTests { } Assert.assertEquals(10, sideOutput.size()); TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedSideOutput, actualSideOutput, Comparator.comparing(o -> ((FileChunk) o).getUuid())); - Assert.assertEquals(0, processWindowFunction.combineErrorChunksCounter.getCount()); + Assert.assertEquals(0, processWindowFunction.errorChunksCounter.getCount()); Assert.assertEquals(0, processWindowFunction.duplicateChunksCounter.getCount()); testHarness.close(); } @@ -294,7 +298,7 @@ public class FileChunkCombinerTests { Assert.assertEquals(inputFiles.get(0), actualOutput.get(0)); Assert.assertEquals(inputFiles.get(1).getChunk().length + pcapngFileChunks.get(5).getChunk().length, actualOutput.get(1).getChunk().length); Assert.assertEquals(inputFiles.get(2).getChunk().length + pcapngIncludeMetaFileChunks.get(5).getChunk().length, actualOutput.get(2).getChunk().length); - Assert.assertEquals(0, processWindowFunction.combineErrorChunksCounter.getCount()); + Assert.assertEquals(0, processWindowFunction.errorChunksCounter.getCount()); Assert.assertEquals(1, processWindowFunction.duplicateChunksCounter.getCount()); testHarness.close(); } @@ -322,7 +326,7 @@ public class FileChunkCombinerTests { Assert.assertEquals(inputFiles.get(0).getChunk().length - emlFileChunks.get(5).getChunk().length, actualOutput.get(0).getChunk().length + actualOutput.get(1).getChunk().length); Assert.assertEquals(inputFiles.get(1).getChunk().length - pcapngFileChunks.get(5).getChunk().length, actualOutput.get(2).getChunk().length); Assert.assertEquals(inputFiles.get(2).getChunk().length - pcapngIncludeMetaFileChunks.get(5).getChunk().length, actualOutput.get(3).getChunk().length); - Assert.assertEquals(0, processWindowFunction.combineErrorChunksCounter.getCount()); + Assert.assertEquals(0, processWindowFunction.errorChunksCounter.getCount()); Assert.assertEquals(0, processWindowFunction.duplicateChunksCounter.getCount()); testHarness.close(); } @@ -339,52 +343,57 @@ public class FileChunkCombinerTests { testHarness.open(); byte[] data = RandomUtil.randomString(1000).getBytes(); //seek文件 - FileChunk fileChunk = new FileChunk("0000000001", "eml", 0, data.length, data, "seek", 1, 5, System.currentTimeMillis()); + FileChunk fileChunk = new FileChunk("0000000001", "eml", 0, data.length, data, "seek", 1, 5, System.currentTimeMillis() * 1000); testHarness.processElement(new StreamRecord<>(fileChunk)); - Assert.assertEquals(1, hosSink.sinkRequestsCounter.getCount()); - Assert.assertEquals(0, hosSink.sinkErrorRequestsCounter.getCount()); - Assert.assertEquals(1, hosSink.sinkFilesCounter.getCount()); - Assert.assertEquals(1, hosSink.sinkChunksCounter.getCount()); - //append文件 - fileChunk = new FileChunk("0000000002", "pcapng", data.length, data, "append", 5, System.currentTimeMillis(), pcapngFileMeta, "1-200,2-200,3-200,4-200,5-200"); - testHarness.processElement(new StreamRecord<>(fileChunk)); - Assert.assertEquals(2, hosSink.sinkRequestsCounter.getCount()); - Assert.assertEquals(0, hosSink.sinkErrorRequestsCounter.getCount()); - Assert.assertEquals(1, hosSink.sinkFilesCounter.getCount()); - Assert.assertEquals(2, hosSink.sinkChunksCounter.getCount()); - Assert.assertEquals(2, hosSink.sinkChunksCounter.getCount()); - Assert.assertEquals(2, hosSink.sinkChunksCounter.getCount()); - Assert.assertEquals(2, hosSink.sinkChunksCounter.getCount()); - Assert.assertEquals(2, hosSink.lessThan5KBChunksCounter.getCount()); + Assert.assertEquals(1, hosSink.chunksInCounter.getCount()); + Assert.assertEquals(1, hosSink.chunksOutCounter.getCount()); + Assert.assertEquals(0, hosSink.errorChunksCounter.getCount()); + Assert.assertEquals(1, hosSink.filesCounter.getCount()); + Assert.assertEquals(1, hosSink.lessThan1KBChunksCounter.getCount()); + Assert.assertEquals(0, hosSink.between1KBAnd5KBChunksCounter.getCount()); Assert.assertEquals(0, hosSink.between5KBAnd10KBChunksCounter.getCount()); - Assert.assertEquals(0, hosSink.between10KBAnd50KBChunksCounter.getCount()); - Assert.assertEquals(0, hosSink.between50KBAnd100KBChunksCounter.getCount()); + Assert.assertEquals(0, hosSink.between10KBAnd100KBChunksCounter.getCount()); + Assert.assertEquals(0, hosSink.between100KBAnd1MBChunksCounter.getCount()); + Assert.assertEquals(0, hosSink.greaterThan1MBChunksCounter.getCount()); + //append文件 + fileChunk = new FileChunk("0000000002", "pcapng", data.length, data, "append", 5, System.currentTimeMillis() * 1000, pcapngFileMeta, "1-200,2-200,3-200,4-200,5-200"); + testHarness.processElement(new StreamRecord<>(fileChunk)); + Assert.assertEquals(2, hosSink.chunksInCounter.getCount()); + Assert.assertEquals(2, hosSink.chunksOutCounter.getCount()); + Assert.assertEquals(0, hosSink.errorChunksCounter.getCount()); + Assert.assertEquals(1, hosSink.filesCounter.getCount()); + Assert.assertEquals(2, hosSink.lessThan1KBChunksCounter.getCount()); + Assert.assertEquals(0, hosSink.between1KBAnd5KBChunksCounter.getCount()); + Assert.assertEquals(0, hosSink.between5KBAnd10KBChunksCounter.getCount()); + Assert.assertEquals(0, hosSink.between10KBAnd100KBChunksCounter.getCount()); Assert.assertEquals(0, hosSink.between100KBAnd1MBChunksCounter.getCount()); Assert.assertEquals(0, hosSink.greaterThan1MBChunksCounter.getCount()); - testHarness.close(); + //测试批量上传 data = RandomUtil.randomString(10000).getBytes(); configuration.setString(Configs.SINK_TYPE, "hos"); configuration.setBoolean(Configs.SINK_BATCH, true); - configuration.setInteger(Configs.SINK_BATCH_COUNT, 2); + configuration.setInteger(Configs.SINK_BATCH_COUNT, 10); + configuration.setInteger(Configs.SINK_BATCH_TIME, 2); hosSink = new HosSink(configuration); fileChunkStreamSink = new StreamSink<>(hosSink); testHarness = new OneInputStreamOperatorTestHarness<>(fileChunkStreamSink); testHarness.setup(); testHarness.open(); - fileChunk = new FileChunk("0000000001", "eml", 0, data.length, data, "seek", 1, 5, System.currentTimeMillis()); + fileChunk = new FileChunk("0000000001", "eml", 0, data.length, data, "seek", 1, 5, System.currentTimeMillis() * 1000); testHarness.processElement(new StreamRecord<>(fileChunk)); - fileChunk = new FileChunk("0000000002", "eml", 0, data.length, data, "seek", 1, 5, System.currentTimeMillis()); + fileChunk = new FileChunk("0000000002", "eml", 0, data.length, data, "seek", 1, 5, System.currentTimeMillis() * 1000); testHarness.processElement(new StreamRecord<>(fileChunk)); - Assert.assertEquals(1, hosSink.sinkRequestsCounter.getCount()); - Assert.assertEquals(0, hosSink.sinkErrorRequestsCounter.getCount()); - Assert.assertEquals(2, hosSink.sinkFilesCounter.getCount()); - Assert.assertEquals(2, hosSink.sinkChunksCounter.getCount()); - Assert.assertEquals(0, hosSink.lessThan5KBChunksCounter.getCount()); + Thread.sleep(configuration.getInteger(Configs.SINK_BATCH_TIME) * 1000L + 1000); + Assert.assertEquals(2, hosSink.chunksInCounter.getCount()); + Assert.assertEquals(2, hosSink.chunksOutCounter.getCount()); + Assert.assertEquals(0, hosSink.errorChunksCounter.getCount()); + Assert.assertEquals(2, hosSink.filesCounter.getCount()); + Assert.assertEquals(0, hosSink.lessThan1KBChunksCounter.getCount()); + Assert.assertEquals(0, hosSink.between1KBAnd5KBChunksCounter.getCount()); Assert.assertEquals(2, hosSink.between5KBAnd10KBChunksCounter.getCount()); - Assert.assertEquals(0, hosSink.between10KBAnd50KBChunksCounter.getCount()); - Assert.assertEquals(0, hosSink.between50KBAnd100KBChunksCounter.getCount()); + Assert.assertEquals(0, hosSink.between10KBAnd100KBChunksCounter.getCount()); Assert.assertEquals(0, hosSink.between100KBAnd1MBChunksCounter.getCount()); Assert.assertEquals(0, hosSink.greaterThan1MBChunksCounter.getCount()); testHarness.close(); @@ -394,7 +403,8 @@ public class FileChunkCombinerTests { public void testHBaseSink() throws Exception { configuration.setString(Configs.SINK_TYPE, "hbase"); configuration.setBoolean(Configs.SINK_BATCH, true); - configuration.setInteger(Configs.SINK_BATCH_COUNT, 2); + configuration.setInteger(Configs.SINK_BATCH_COUNT, 10); + configuration.setInteger(Configs.SINK_BATCH_TIME, 2); HBaseSink hBaseSink = new HBaseSink(configuration); StreamSink fileChunkStreamSink = new StreamSink<>(hBaseSink); OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness<>(fileChunkStreamSink); @@ -405,14 +415,15 @@ public class FileChunkCombinerTests { testHarness.processElement(new StreamRecord<>(fileChunk)); fileChunk = new FileChunk("0000000002", "eml", 0, data.length, data, "seek", 1, 5, System.currentTimeMillis()); testHarness.processElement(new StreamRecord<>(fileChunk)); - Assert.assertEquals(3, hBaseSink.sinkRequestsCounter.getCount()); - Assert.assertEquals(0, hBaseSink.sinkErrorRequestsCounter.getCount()); - Assert.assertEquals(2, hBaseSink.sinkFilesCounter.getCount()); - Assert.assertEquals(2, hBaseSink.sinkChunksCounter.getCount()); - Assert.assertEquals(2, hBaseSink.lessThan5KBChunksCounter.getCount()); + Thread.sleep(configuration.getInteger(Configs.SINK_BATCH_TIME) * 1000L + 1000); + Assert.assertEquals(2, hBaseSink.chunksInCounter.getCount()); + Assert.assertEquals(2, hBaseSink.chunksOutCounter.getCount()); + Assert.assertEquals(0, hBaseSink.errorChunksCounter.getCount()); + Assert.assertEquals(2, hBaseSink.filesCounter.getCount()); + Assert.assertEquals(2, hBaseSink.lessThan1KBChunksCounter.getCount()); + Assert.assertEquals(0, hBaseSink.between1KBAnd5KBChunksCounter.getCount()); Assert.assertEquals(0, hBaseSink.between5KBAnd10KBChunksCounter.getCount()); - Assert.assertEquals(0, hBaseSink.between10KBAnd50KBChunksCounter.getCount()); - Assert.assertEquals(0, hBaseSink.between50KBAnd100KBChunksCounter.getCount()); + Assert.assertEquals(0, hBaseSink.between10KBAnd100KBChunksCounter.getCount()); Assert.assertEquals(0, hBaseSink.between100KBAnd1MBChunksCounter.getCount()); Assert.assertEquals(0, hBaseSink.greaterThan1MBChunksCounter.getCount()); testHarness.close(); @@ -426,7 +437,7 @@ public class FileChunkCombinerTests { StreamExecutionEnvironment env = createPipeline(2, new ByteDataSource(messagePackList, 500, windowTime), windowTime, 1); env.execute(); List fileChunks = CollectSink.values; - Assert.assertTrue("合并错误,sink输出错误", fileChunks.size() > 0); + Assert.assertFalse("合并错误,sink输出错误", fileChunks.isEmpty()); categorizeChunks(fileChunks); byte[] data = new byte[0]; long length = 0; @@ -479,7 +490,7 @@ public class FileChunkCombinerTests { StreamExecutionEnvironment env = createPipeline(2, new ByteDataSource(messagePackList, 500, windowTime), windowTime, 1); env.execute(); List fileChunks = CollectSink.values; - Assert.assertTrue("合并错误,sink输出错误", fileChunks.size() > 0); + Assert.assertFalse("合并错误,sink输出错误", fileChunks.isEmpty()); categorizeChunks(fileChunks); byte[] data = new byte[0]; long length = 0; @@ -531,7 +542,7 @@ public class FileChunkCombinerTests { StreamExecutionEnvironment env = createPipeline(2, new ByteDataSource(messagePackList, 500, windowTime), windowTime, 1); env.execute(); List fileChunks = CollectSink.values; - Assert.assertTrue("合并错误,sink输出错误", fileChunks.size() > 0); + Assert.assertFalse("合并错误,sink输出错误", fileChunks.isEmpty()); categorizeChunks(fileChunks); byte[] data = new byte[0]; long length = 0; @@ -619,44 +630,6 @@ public class FileChunkCombinerTests { } } - @Test - public void testMock() throws Exception { - ProcessWindowFunctionMock mock = Mockito.mock(ProcessWindowFunctionMock.class); - InternalIterableProcessWindowFunction windowFunction = new InternalIterableProcessWindowFunction<>(mock); - TypeInformation fileChunkType = PojoTypeInfo.of(FileChunk.class); - ExecutionConfig execConf = new ExecutionConfig(); - execConf.setParallelism(5); - StreamingFunctionUtils.setOutputType(windowFunction, fileChunkType, execConf); - Mockito.verify(mock).setOutputType(fileChunkType, execConf); - Configuration config = new Configuration(); - windowFunction.open(config); - Mockito.verify(mock).open(config); - RuntimeContext rCtx = Mockito.mock(RuntimeContext.class); - windowFunction.setRuntimeContext(rCtx); - (Mockito.verify(mock)).setRuntimeContext(rCtx); - TimeWindow w = Mockito.mock(TimeWindow.class); - Iterable i = Mockito.mock(Iterable.class); - Collector c = Mockito.mock(Collector.class); - InternalWindowFunction.InternalWindowContext ctx = Mockito.mock(InternalWindowFunction.InternalWindowContext.class); - (Mockito.doAnswer(new Answer() { - public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - ProcessWindowFunction.Context c = (ProcessWindowFunction.Context) invocationOnMock.getArguments()[1]; - c.currentProcessingTime(); - c.currentWatermark(); - c.windowState(); - c.globalState(); - return null; - } - }).when(mock)).process(Mockito.anyString(), Mockito.anyObject(), Mockito.eq(i), Mockito.eq(c)); - windowFunction.process("", w, ctx, i, c); - Mockito.verify(ctx).currentProcessingTime(); - Mockito.verify(ctx).currentWatermark(); - Mockito.verify(ctx).windowState(); - Mockito.verify(ctx).globalState(); - windowFunction.close(); - Mockito.verify(mock).close(); - } - private static class ProcessWindowFunctionMock extends ProcessWindowFunction implements OutputTypeConfigurable { private static final long serialVersionUID = 1L; @@ -682,8 +655,8 @@ public class FileChunkCombinerTests { triggers.add(LastChunkOrNoDataInTimeTrigger.of(windowIdleTime * 1000)); Trigger trigger = MultipleTrigger.of(triggers); env.addSource(source) - .map(new ParseMessagePackMapFunction(false, Long.MAX_VALUE, "")) - .filter(new FileChunkFilterFunction(Long.MAX_VALUE, "")) + .map(new ParseMessagePackMapFunction()) + .filter(new FileChunkFilterFunction("","test")) .assignTimestampsAndWatermarks(watermarkStrategy) .keyBy(new FileChunkKeySelector(), BasicTypeInfo.STRING_TYPE_INFO) .window(TumblingEventTimeWindows.of(Time.seconds(windowTime))) @@ -705,6 +678,44 @@ public class FileChunkCombinerTests { } } +// @Test +// public void testMock() throws Exception { +// ProcessWindowFunctionMock mock = Mockito.mock(ProcessWindowFunctionMock.class); +// InternalIterableProcessWindowFunction windowFunction = new InternalIterableProcessWindowFunction<>(mock); +// TypeInformation fileChunkType = PojoTypeInfo.of(FileChunk.class); +// ExecutionConfig execConf = new ExecutionConfig(); +// execConf.setParallelism(5); +// StreamingFunctionUtils.setOutputType(windowFunction, fileChunkType, execConf); +// Mockito.verify(mock).setOutputType(fileChunkType, execConf); +// Configuration config = new Configuration(); +// windowFunction.open(config); +// Mockito.verify(mock).open(config); +// RuntimeContext rCtx = Mockito.mock(RuntimeContext.class); +// windowFunction.setRuntimeContext(rCtx); +// (Mockito.verify(mock)).setRuntimeContext(rCtx); +// TimeWindow w = Mockito.mock(TimeWindow.class); +// Iterable i = Mockito.mock(Iterable.class); +// Collector c = Mockito.mock(Collector.class); +// InternalWindowFunction.InternalWindowContext ctx = Mockito.mock(InternalWindowFunction.InternalWindowContext.class); +// (Mockito.doAnswer(new Answer() { +// public Object answer(InvocationOnMock invocationOnMock) throws Throwable { +// ProcessWindowFunction.Context c = (ProcessWindowFunction.Context) invocationOnMock.getArguments()[1]; +// c.currentProcessingTime(); +// c.currentWatermark(); +// c.windowState(); +// c.globalState(); +// return null; +// } +// }).when(mock)).process(Mockito.anyString(), Mockito.anyObject(), Mockito.eq(i), Mockito.eq(c)); +// windowFunction.process("", w, ctx, i, c); +// Mockito.verify(ctx).currentProcessingTime(); +// Mockito.verify(ctx).currentWatermark(); +// Mockito.verify(ctx).windowState(); +// Mockito.verify(ctx).globalState(); +// windowFunction.close(); +// Mockito.verify(mock).close(); +// } + // @Test // public void testCombineChunkProcessWindowFunction() throws Exception { // List> triggers = new ArrayList<>();