更新单元测试
This commit is contained in:
@@ -5,6 +5,8 @@ import cn.hutool.core.util.ArrayUtil;
|
|||||||
import cn.hutool.core.util.RandomUtil;
|
import cn.hutool.core.util.RandomUtil;
|
||||||
import com.zdjizhi.config.Configs;
|
import com.zdjizhi.config.Configs;
|
||||||
import com.zdjizhi.function.*;
|
import com.zdjizhi.function.*;
|
||||||
|
import com.zdjizhi.function.map.ParseMessagePackMapFunction;
|
||||||
|
import com.zdjizhi.function.map.SideOutputMapFunction;
|
||||||
import com.zdjizhi.pojo.FileChunk;
|
import com.zdjizhi.pojo.FileChunk;
|
||||||
import com.zdjizhi.sink.HBaseSink;
|
import com.zdjizhi.sink.HBaseSink;
|
||||||
import com.zdjizhi.sink.HosSink;
|
import com.zdjizhi.sink.HosSink;
|
||||||
@@ -49,6 +51,8 @@ import org.mockito.invocation.InvocationOnMock;
|
|||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
import java.io.*;
|
import java.io.*;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Paths;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
@@ -66,9 +70,9 @@ public class FileChunkCombinerTests {
|
|||||||
private List<FileChunk> pcapngFileChunks;
|
private List<FileChunk> pcapngFileChunks;
|
||||||
private List<FileChunk> pcapngIncludeMetaFileChunks;
|
private List<FileChunk> pcapngIncludeMetaFileChunks;
|
||||||
private Map<String, Object> pcapngFileMeta;
|
private Map<String, Object> pcapngFileMeta;
|
||||||
private int emlChunkCount = 10;
|
private final int emlChunkCount = 10;
|
||||||
private int pcapngChunkCount = 10;
|
private final int pcapngChunkCount = 10;
|
||||||
private String pcapChunkData = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
|
private final String pcapChunkData = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
|
||||||
private static Configuration configuration;
|
private static Configuration configuration;
|
||||||
private CombineChunkProcessWindowFunction processWindowFunction;
|
private CombineChunkProcessWindowFunction processWindowFunction;
|
||||||
private OutputTag<FileChunk> delayedChunkOutputTag;
|
private OutputTag<FileChunk> delayedChunkOutputTag;
|
||||||
@@ -93,13 +97,13 @@ public class FileChunkCombinerTests {
|
|||||||
emlFileChunks = new ArrayList<>();
|
emlFileChunks = new ArrayList<>();
|
||||||
pcapngFileChunks = new ArrayList<>();
|
pcapngFileChunks = new ArrayList<>();
|
||||||
pcapngIncludeMetaFileChunks = new ArrayList<>();
|
pcapngIncludeMetaFileChunks = new ArrayList<>();
|
||||||
ObjectInputStream messagePacksInputStream = new ObjectInputStream(new FileInputStream("src" + File.separator + "test" + File.separator + "data" + File.separator + "messagePacks"));
|
ObjectInputStream messagePacksInputStream = new ObjectInputStream(Files.newInputStream(Paths.get("src" + File.separator + "test" + File.separator + "data" + File.separator + "messagePacks")));
|
||||||
messagePackList = (List<byte[]>) messagePacksInputStream.readObject();
|
messagePackList = (List<byte[]>) messagePacksInputStream.readObject();
|
||||||
messagePacksInputStream.close();
|
messagePacksInputStream.close();
|
||||||
ObjectInputStream fileChunksInputStream = new ObjectInputStream(new FileInputStream("src" + File.separator + "test" + File.separator + "data" + File.separator + "fileChunks"));
|
ObjectInputStream fileChunksInputStream = new ObjectInputStream(Files.newInputStream(Paths.get("src" + File.separator + "test" + File.separator + "data" + File.separator + "fileChunks")));
|
||||||
inputFileChunks = (List<FileChunk>) fileChunksInputStream.readObject();
|
inputFileChunks = (List<FileChunk>) fileChunksInputStream.readObject();
|
||||||
fileChunksInputStream.close();
|
fileChunksInputStream.close();
|
||||||
ObjectInputStream filesInputStream = new ObjectInputStream(new FileInputStream("src" + File.separator + "test" + File.separator + "data" + File.separator + "files"));
|
ObjectInputStream filesInputStream = new ObjectInputStream(Files.newInputStream(Paths.get("src" + File.separator + "test" + File.separator + "data" + File.separator + "files")));
|
||||||
inputFiles = (List<FileChunk>) filesInputStream.readObject();
|
inputFiles = (List<FileChunk>) filesInputStream.readObject();
|
||||||
filesInputStream.close();
|
filesInputStream.close();
|
||||||
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
||||||
@@ -125,7 +129,7 @@ public class FileChunkCombinerTests {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testParseMessagePackMapFunction() throws Exception {
|
public void testParseMessagePackMapFunction() throws Exception {
|
||||||
ParseMessagePackMapFunction mapFunction = new ParseMessagePackMapFunction(false, Long.MAX_VALUE, "");
|
ParseMessagePackMapFunction mapFunction = new ParseMessagePackMapFunction();
|
||||||
OneInputStreamOperatorTestHarness<byte[], FileChunk> testHarness = new OneInputStreamOperatorTestHarness<>(new StreamMap<>(mapFunction));
|
OneInputStreamOperatorTestHarness<byte[], FileChunk> testHarness = new OneInputStreamOperatorTestHarness<>(new StreamMap<>(mapFunction));
|
||||||
testHarness.setup();
|
testHarness.setup();
|
||||||
testHarness.open();
|
testHarness.open();
|
||||||
@@ -143,12 +147,12 @@ public class FileChunkCombinerTests {
|
|||||||
StreamRecord sr1 = (StreamRecord) o2;
|
StreamRecord sr1 = (StreamRecord) o2;
|
||||||
return ((FileChunk) sr0.getValue()).getUuid().compareTo(((FileChunk) sr1.getValue()).getUuid());
|
return ((FileChunk) sr0.getValue()).getUuid().compareTo(((FileChunk) sr1.getValue()).getUuid());
|
||||||
});
|
});
|
||||||
Assert.assertEquals(30, mapFunction.parseMessagePacksCounter.getCount());
|
Assert.assertEquals(30, mapFunction.chunksInCounter.getCount());
|
||||||
Assert.assertEquals(0, mapFunction.parseErrorMessagePacksCounter.getCount());
|
Assert.assertEquals(30, mapFunction.chunksOutCounter.getCount());
|
||||||
Assert.assertEquals(0, mapFunction.rateLimitDropChunksCounter.getCount());
|
Assert.assertEquals(0, mapFunction.errorChunksCounter.getCount());
|
||||||
Assert.assertEquals(21, mapFunction.equal0BChunksCounter.getCount());
|
Assert.assertEquals(22, mapFunction.lessThan1KBChunksCounter.getCount());
|
||||||
Assert.assertEquals(1, mapFunction.lessThan1KBChunksCounter.getCount());
|
Assert.assertEquals(8, mapFunction.between1KBAnd3KBChunksCounter.getCount());
|
||||||
Assert.assertEquals(8, mapFunction.between1KBAnd5KBChunksCounter.getCount());
|
Assert.assertEquals(0, mapFunction.between3KBAnd5KBChunksCounter.getCount());
|
||||||
Assert.assertEquals(0, mapFunction.between5KBAnd10KBChunksCounter.getCount());
|
Assert.assertEquals(0, mapFunction.between5KBAnd10KBChunksCounter.getCount());
|
||||||
Assert.assertEquals(0, mapFunction.between10KBAnd50KBChunksCounter.getCount());
|
Assert.assertEquals(0, mapFunction.between10KBAnd50KBChunksCounter.getCount());
|
||||||
Assert.assertEquals(0, mapFunction.between50KBAnd100KBChunksCounter.getCount());
|
Assert.assertEquals(0, mapFunction.between50KBAnd100KBChunksCounter.getCount());
|
||||||
@@ -191,7 +195,7 @@ public class FileChunkCombinerTests {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFileChunkFilterFunction() throws Exception {
|
public void testFileChunkFilterFunction() throws Exception {
|
||||||
FileChunkFilterFunction fileChunkFilterFunction = new FileChunkFilterFunction(Long.MAX_VALUE, "FileChunk.fileType == \"eml\"");
|
FileChunkFilterFunction fileChunkFilterFunction = new FileChunkFilterFunction("FileChunk.fileType == \"eml\"", "test");
|
||||||
StreamFilter<FileChunk> fileChunkStreamFilter = new StreamFilter<>(fileChunkFilterFunction);
|
StreamFilter<FileChunk> fileChunkStreamFilter = new StreamFilter<>(fileChunkFilterFunction);
|
||||||
OneInputStreamOperatorTestHarness<FileChunk, FileChunk> testHarness = new OneInputStreamOperatorTestHarness<>(fileChunkStreamFilter);
|
OneInputStreamOperatorTestHarness<FileChunk, FileChunk> testHarness = new OneInputStreamOperatorTestHarness<>(fileChunkStreamFilter);
|
||||||
testHarness.setup();
|
testHarness.setup();
|
||||||
@@ -232,7 +236,7 @@ public class FileChunkCombinerTests {
|
|||||||
List<Object> actualOutput = new ArrayList<>(testHarness.extractOutputValues());
|
List<Object> actualOutput = new ArrayList<>(testHarness.extractOutputValues());
|
||||||
Assert.assertEquals(3, actualOutput.size());
|
Assert.assertEquals(3, actualOutput.size());
|
||||||
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, actualOutput, Comparator.comparing(o -> ((FileChunk) o).getUuid()));
|
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, actualOutput, Comparator.comparing(o -> ((FileChunk) o).getUuid()));
|
||||||
Assert.assertEquals(0, processWindowFunction.combineErrorChunksCounter.getCount());
|
Assert.assertEquals(0, processWindowFunction.errorChunksCounter.getCount());
|
||||||
Assert.assertEquals(0, processWindowFunction.duplicateChunksCounter.getCount());
|
Assert.assertEquals(0, processWindowFunction.duplicateChunksCounter.getCount());
|
||||||
testHarness.close();
|
testHarness.close();
|
||||||
}
|
}
|
||||||
@@ -266,7 +270,7 @@ public class FileChunkCombinerTests {
|
|||||||
}
|
}
|
||||||
Assert.assertEquals(10, sideOutput.size());
|
Assert.assertEquals(10, sideOutput.size());
|
||||||
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedSideOutput, actualSideOutput, Comparator.comparing(o -> ((FileChunk) o).getUuid()));
|
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedSideOutput, actualSideOutput, Comparator.comparing(o -> ((FileChunk) o).getUuid()));
|
||||||
Assert.assertEquals(0, processWindowFunction.combineErrorChunksCounter.getCount());
|
Assert.assertEquals(0, processWindowFunction.errorChunksCounter.getCount());
|
||||||
Assert.assertEquals(0, processWindowFunction.duplicateChunksCounter.getCount());
|
Assert.assertEquals(0, processWindowFunction.duplicateChunksCounter.getCount());
|
||||||
testHarness.close();
|
testHarness.close();
|
||||||
}
|
}
|
||||||
@@ -294,7 +298,7 @@ public class FileChunkCombinerTests {
|
|||||||
Assert.assertEquals(inputFiles.get(0), actualOutput.get(0));
|
Assert.assertEquals(inputFiles.get(0), actualOutput.get(0));
|
||||||
Assert.assertEquals(inputFiles.get(1).getChunk().length + pcapngFileChunks.get(5).getChunk().length, actualOutput.get(1).getChunk().length);
|
Assert.assertEquals(inputFiles.get(1).getChunk().length + pcapngFileChunks.get(5).getChunk().length, actualOutput.get(1).getChunk().length);
|
||||||
Assert.assertEquals(inputFiles.get(2).getChunk().length + pcapngIncludeMetaFileChunks.get(5).getChunk().length, actualOutput.get(2).getChunk().length);
|
Assert.assertEquals(inputFiles.get(2).getChunk().length + pcapngIncludeMetaFileChunks.get(5).getChunk().length, actualOutput.get(2).getChunk().length);
|
||||||
Assert.assertEquals(0, processWindowFunction.combineErrorChunksCounter.getCount());
|
Assert.assertEquals(0, processWindowFunction.errorChunksCounter.getCount());
|
||||||
Assert.assertEquals(1, processWindowFunction.duplicateChunksCounter.getCount());
|
Assert.assertEquals(1, processWindowFunction.duplicateChunksCounter.getCount());
|
||||||
testHarness.close();
|
testHarness.close();
|
||||||
}
|
}
|
||||||
@@ -322,7 +326,7 @@ public class FileChunkCombinerTests {
|
|||||||
Assert.assertEquals(inputFiles.get(0).getChunk().length - emlFileChunks.get(5).getChunk().length, actualOutput.get(0).getChunk().length + actualOutput.get(1).getChunk().length);
|
Assert.assertEquals(inputFiles.get(0).getChunk().length - emlFileChunks.get(5).getChunk().length, actualOutput.get(0).getChunk().length + actualOutput.get(1).getChunk().length);
|
||||||
Assert.assertEquals(inputFiles.get(1).getChunk().length - pcapngFileChunks.get(5).getChunk().length, actualOutput.get(2).getChunk().length);
|
Assert.assertEquals(inputFiles.get(1).getChunk().length - pcapngFileChunks.get(5).getChunk().length, actualOutput.get(2).getChunk().length);
|
||||||
Assert.assertEquals(inputFiles.get(2).getChunk().length - pcapngIncludeMetaFileChunks.get(5).getChunk().length, actualOutput.get(3).getChunk().length);
|
Assert.assertEquals(inputFiles.get(2).getChunk().length - pcapngIncludeMetaFileChunks.get(5).getChunk().length, actualOutput.get(3).getChunk().length);
|
||||||
Assert.assertEquals(0, processWindowFunction.combineErrorChunksCounter.getCount());
|
Assert.assertEquals(0, processWindowFunction.errorChunksCounter.getCount());
|
||||||
Assert.assertEquals(0, processWindowFunction.duplicateChunksCounter.getCount());
|
Assert.assertEquals(0, processWindowFunction.duplicateChunksCounter.getCount());
|
||||||
testHarness.close();
|
testHarness.close();
|
||||||
}
|
}
|
||||||
@@ -339,52 +343,57 @@ public class FileChunkCombinerTests {
|
|||||||
testHarness.open();
|
testHarness.open();
|
||||||
byte[] data = RandomUtil.randomString(1000).getBytes();
|
byte[] data = RandomUtil.randomString(1000).getBytes();
|
||||||
//seek文件
|
//seek文件
|
||||||
FileChunk fileChunk = new FileChunk("0000000001", "eml", 0, data.length, data, "seek", 1, 5, System.currentTimeMillis());
|
FileChunk fileChunk = new FileChunk("0000000001", "eml", 0, data.length, data, "seek", 1, 5, System.currentTimeMillis() * 1000);
|
||||||
testHarness.processElement(new StreamRecord<>(fileChunk));
|
testHarness.processElement(new StreamRecord<>(fileChunk));
|
||||||
Assert.assertEquals(1, hosSink.sinkRequestsCounter.getCount());
|
Assert.assertEquals(1, hosSink.chunksInCounter.getCount());
|
||||||
Assert.assertEquals(0, hosSink.sinkErrorRequestsCounter.getCount());
|
Assert.assertEquals(1, hosSink.chunksOutCounter.getCount());
|
||||||
Assert.assertEquals(1, hosSink.sinkFilesCounter.getCount());
|
Assert.assertEquals(0, hosSink.errorChunksCounter.getCount());
|
||||||
Assert.assertEquals(1, hosSink.sinkChunksCounter.getCount());
|
Assert.assertEquals(1, hosSink.filesCounter.getCount());
|
||||||
//append文件
|
Assert.assertEquals(1, hosSink.lessThan1KBChunksCounter.getCount());
|
||||||
fileChunk = new FileChunk("0000000002", "pcapng", data.length, data, "append", 5, System.currentTimeMillis(), pcapngFileMeta, "1-200,2-200,3-200,4-200,5-200");
|
Assert.assertEquals(0, hosSink.between1KBAnd5KBChunksCounter.getCount());
|
||||||
testHarness.processElement(new StreamRecord<>(fileChunk));
|
|
||||||
Assert.assertEquals(2, hosSink.sinkRequestsCounter.getCount());
|
|
||||||
Assert.assertEquals(0, hosSink.sinkErrorRequestsCounter.getCount());
|
|
||||||
Assert.assertEquals(1, hosSink.sinkFilesCounter.getCount());
|
|
||||||
Assert.assertEquals(2, hosSink.sinkChunksCounter.getCount());
|
|
||||||
Assert.assertEquals(2, hosSink.sinkChunksCounter.getCount());
|
|
||||||
Assert.assertEquals(2, hosSink.sinkChunksCounter.getCount());
|
|
||||||
Assert.assertEquals(2, hosSink.sinkChunksCounter.getCount());
|
|
||||||
Assert.assertEquals(2, hosSink.lessThan5KBChunksCounter.getCount());
|
|
||||||
Assert.assertEquals(0, hosSink.between5KBAnd10KBChunksCounter.getCount());
|
Assert.assertEquals(0, hosSink.between5KBAnd10KBChunksCounter.getCount());
|
||||||
Assert.assertEquals(0, hosSink.between10KBAnd50KBChunksCounter.getCount());
|
Assert.assertEquals(0, hosSink.between10KBAnd100KBChunksCounter.getCount());
|
||||||
Assert.assertEquals(0, hosSink.between50KBAnd100KBChunksCounter.getCount());
|
Assert.assertEquals(0, hosSink.between100KBAnd1MBChunksCounter.getCount());
|
||||||
|
Assert.assertEquals(0, hosSink.greaterThan1MBChunksCounter.getCount());
|
||||||
|
//append文件
|
||||||
|
fileChunk = new FileChunk("0000000002", "pcapng", data.length, data, "append", 5, System.currentTimeMillis() * 1000, pcapngFileMeta, "1-200,2-200,3-200,4-200,5-200");
|
||||||
|
testHarness.processElement(new StreamRecord<>(fileChunk));
|
||||||
|
Assert.assertEquals(2, hosSink.chunksInCounter.getCount());
|
||||||
|
Assert.assertEquals(2, hosSink.chunksOutCounter.getCount());
|
||||||
|
Assert.assertEquals(0, hosSink.errorChunksCounter.getCount());
|
||||||
|
Assert.assertEquals(1, hosSink.filesCounter.getCount());
|
||||||
|
Assert.assertEquals(2, hosSink.lessThan1KBChunksCounter.getCount());
|
||||||
|
Assert.assertEquals(0, hosSink.between1KBAnd5KBChunksCounter.getCount());
|
||||||
|
Assert.assertEquals(0, hosSink.between5KBAnd10KBChunksCounter.getCount());
|
||||||
|
Assert.assertEquals(0, hosSink.between10KBAnd100KBChunksCounter.getCount());
|
||||||
Assert.assertEquals(0, hosSink.between100KBAnd1MBChunksCounter.getCount());
|
Assert.assertEquals(0, hosSink.between100KBAnd1MBChunksCounter.getCount());
|
||||||
Assert.assertEquals(0, hosSink.greaterThan1MBChunksCounter.getCount());
|
Assert.assertEquals(0, hosSink.greaterThan1MBChunksCounter.getCount());
|
||||||
|
|
||||||
testHarness.close();
|
testHarness.close();
|
||||||
|
|
||||||
//测试批量上传
|
//测试批量上传
|
||||||
data = RandomUtil.randomString(10000).getBytes();
|
data = RandomUtil.randomString(10000).getBytes();
|
||||||
configuration.setString(Configs.SINK_TYPE, "hos");
|
configuration.setString(Configs.SINK_TYPE, "hos");
|
||||||
configuration.setBoolean(Configs.SINK_BATCH, true);
|
configuration.setBoolean(Configs.SINK_BATCH, true);
|
||||||
configuration.setInteger(Configs.SINK_BATCH_COUNT, 2);
|
configuration.setInteger(Configs.SINK_BATCH_COUNT, 10);
|
||||||
|
configuration.setInteger(Configs.SINK_BATCH_TIME, 2);
|
||||||
hosSink = new HosSink(configuration);
|
hosSink = new HosSink(configuration);
|
||||||
fileChunkStreamSink = new StreamSink<>(hosSink);
|
fileChunkStreamSink = new StreamSink<>(hosSink);
|
||||||
testHarness = new OneInputStreamOperatorTestHarness<>(fileChunkStreamSink);
|
testHarness = new OneInputStreamOperatorTestHarness<>(fileChunkStreamSink);
|
||||||
testHarness.setup();
|
testHarness.setup();
|
||||||
testHarness.open();
|
testHarness.open();
|
||||||
fileChunk = new FileChunk("0000000001", "eml", 0, data.length, data, "seek", 1, 5, System.currentTimeMillis());
|
fileChunk = new FileChunk("0000000001", "eml", 0, data.length, data, "seek", 1, 5, System.currentTimeMillis() * 1000);
|
||||||
testHarness.processElement(new StreamRecord<>(fileChunk));
|
testHarness.processElement(new StreamRecord<>(fileChunk));
|
||||||
fileChunk = new FileChunk("0000000002", "eml", 0, data.length, data, "seek", 1, 5, System.currentTimeMillis());
|
fileChunk = new FileChunk("0000000002", "eml", 0, data.length, data, "seek", 1, 5, System.currentTimeMillis() * 1000);
|
||||||
testHarness.processElement(new StreamRecord<>(fileChunk));
|
testHarness.processElement(new StreamRecord<>(fileChunk));
|
||||||
Assert.assertEquals(1, hosSink.sinkRequestsCounter.getCount());
|
Thread.sleep(configuration.getInteger(Configs.SINK_BATCH_TIME) * 1000L + 1000);
|
||||||
Assert.assertEquals(0, hosSink.sinkErrorRequestsCounter.getCount());
|
Assert.assertEquals(2, hosSink.chunksInCounter.getCount());
|
||||||
Assert.assertEquals(2, hosSink.sinkFilesCounter.getCount());
|
Assert.assertEquals(2, hosSink.chunksOutCounter.getCount());
|
||||||
Assert.assertEquals(2, hosSink.sinkChunksCounter.getCount());
|
Assert.assertEquals(0, hosSink.errorChunksCounter.getCount());
|
||||||
Assert.assertEquals(0, hosSink.lessThan5KBChunksCounter.getCount());
|
Assert.assertEquals(2, hosSink.filesCounter.getCount());
|
||||||
|
Assert.assertEquals(0, hosSink.lessThan1KBChunksCounter.getCount());
|
||||||
|
Assert.assertEquals(0, hosSink.between1KBAnd5KBChunksCounter.getCount());
|
||||||
Assert.assertEquals(2, hosSink.between5KBAnd10KBChunksCounter.getCount());
|
Assert.assertEquals(2, hosSink.between5KBAnd10KBChunksCounter.getCount());
|
||||||
Assert.assertEquals(0, hosSink.between10KBAnd50KBChunksCounter.getCount());
|
Assert.assertEquals(0, hosSink.between10KBAnd100KBChunksCounter.getCount());
|
||||||
Assert.assertEquals(0, hosSink.between50KBAnd100KBChunksCounter.getCount());
|
|
||||||
Assert.assertEquals(0, hosSink.between100KBAnd1MBChunksCounter.getCount());
|
Assert.assertEquals(0, hosSink.between100KBAnd1MBChunksCounter.getCount());
|
||||||
Assert.assertEquals(0, hosSink.greaterThan1MBChunksCounter.getCount());
|
Assert.assertEquals(0, hosSink.greaterThan1MBChunksCounter.getCount());
|
||||||
testHarness.close();
|
testHarness.close();
|
||||||
@@ -394,7 +403,8 @@ public class FileChunkCombinerTests {
|
|||||||
public void testHBaseSink() throws Exception {
|
public void testHBaseSink() throws Exception {
|
||||||
configuration.setString(Configs.SINK_TYPE, "hbase");
|
configuration.setString(Configs.SINK_TYPE, "hbase");
|
||||||
configuration.setBoolean(Configs.SINK_BATCH, true);
|
configuration.setBoolean(Configs.SINK_BATCH, true);
|
||||||
configuration.setInteger(Configs.SINK_BATCH_COUNT, 2);
|
configuration.setInteger(Configs.SINK_BATCH_COUNT, 10);
|
||||||
|
configuration.setInteger(Configs.SINK_BATCH_TIME, 2);
|
||||||
HBaseSink hBaseSink = new HBaseSink(configuration);
|
HBaseSink hBaseSink = new HBaseSink(configuration);
|
||||||
StreamSink<FileChunk> fileChunkStreamSink = new StreamSink<>(hBaseSink);
|
StreamSink<FileChunk> fileChunkStreamSink = new StreamSink<>(hBaseSink);
|
||||||
OneInputStreamOperatorTestHarness<FileChunk, Object> testHarness = new OneInputStreamOperatorTestHarness<>(fileChunkStreamSink);
|
OneInputStreamOperatorTestHarness<FileChunk, Object> testHarness = new OneInputStreamOperatorTestHarness<>(fileChunkStreamSink);
|
||||||
@@ -405,14 +415,15 @@ public class FileChunkCombinerTests {
|
|||||||
testHarness.processElement(new StreamRecord<>(fileChunk));
|
testHarness.processElement(new StreamRecord<>(fileChunk));
|
||||||
fileChunk = new FileChunk("0000000002", "eml", 0, data.length, data, "seek", 1, 5, System.currentTimeMillis());
|
fileChunk = new FileChunk("0000000002", "eml", 0, data.length, data, "seek", 1, 5, System.currentTimeMillis());
|
||||||
testHarness.processElement(new StreamRecord<>(fileChunk));
|
testHarness.processElement(new StreamRecord<>(fileChunk));
|
||||||
Assert.assertEquals(3, hBaseSink.sinkRequestsCounter.getCount());
|
Thread.sleep(configuration.getInteger(Configs.SINK_BATCH_TIME) * 1000L + 1000);
|
||||||
Assert.assertEquals(0, hBaseSink.sinkErrorRequestsCounter.getCount());
|
Assert.assertEquals(2, hBaseSink.chunksInCounter.getCount());
|
||||||
Assert.assertEquals(2, hBaseSink.sinkFilesCounter.getCount());
|
Assert.assertEquals(2, hBaseSink.chunksOutCounter.getCount());
|
||||||
Assert.assertEquals(2, hBaseSink.sinkChunksCounter.getCount());
|
Assert.assertEquals(0, hBaseSink.errorChunksCounter.getCount());
|
||||||
Assert.assertEquals(2, hBaseSink.lessThan5KBChunksCounter.getCount());
|
Assert.assertEquals(2, hBaseSink.filesCounter.getCount());
|
||||||
|
Assert.assertEquals(2, hBaseSink.lessThan1KBChunksCounter.getCount());
|
||||||
|
Assert.assertEquals(0, hBaseSink.between1KBAnd5KBChunksCounter.getCount());
|
||||||
Assert.assertEquals(0, hBaseSink.between5KBAnd10KBChunksCounter.getCount());
|
Assert.assertEquals(0, hBaseSink.between5KBAnd10KBChunksCounter.getCount());
|
||||||
Assert.assertEquals(0, hBaseSink.between10KBAnd50KBChunksCounter.getCount());
|
Assert.assertEquals(0, hBaseSink.between10KBAnd100KBChunksCounter.getCount());
|
||||||
Assert.assertEquals(0, hBaseSink.between50KBAnd100KBChunksCounter.getCount());
|
|
||||||
Assert.assertEquals(0, hBaseSink.between100KBAnd1MBChunksCounter.getCount());
|
Assert.assertEquals(0, hBaseSink.between100KBAnd1MBChunksCounter.getCount());
|
||||||
Assert.assertEquals(0, hBaseSink.greaterThan1MBChunksCounter.getCount());
|
Assert.assertEquals(0, hBaseSink.greaterThan1MBChunksCounter.getCount());
|
||||||
testHarness.close();
|
testHarness.close();
|
||||||
@@ -426,7 +437,7 @@ public class FileChunkCombinerTests {
|
|||||||
StreamExecutionEnvironment env = createPipeline(2, new ByteDataSource(messagePackList, 500, windowTime), windowTime, 1);
|
StreamExecutionEnvironment env = createPipeline(2, new ByteDataSource(messagePackList, 500, windowTime), windowTime, 1);
|
||||||
env.execute();
|
env.execute();
|
||||||
List<FileChunk> fileChunks = CollectSink.values;
|
List<FileChunk> fileChunks = CollectSink.values;
|
||||||
Assert.assertTrue("合并错误,sink输出错误", fileChunks.size() > 0);
|
Assert.assertFalse("合并错误,sink输出错误", fileChunks.isEmpty());
|
||||||
categorizeChunks(fileChunks);
|
categorizeChunks(fileChunks);
|
||||||
byte[] data = new byte[0];
|
byte[] data = new byte[0];
|
||||||
long length = 0;
|
long length = 0;
|
||||||
@@ -479,7 +490,7 @@ public class FileChunkCombinerTests {
|
|||||||
StreamExecutionEnvironment env = createPipeline(2, new ByteDataSource(messagePackList, 500, windowTime), windowTime, 1);
|
StreamExecutionEnvironment env = createPipeline(2, new ByteDataSource(messagePackList, 500, windowTime), windowTime, 1);
|
||||||
env.execute();
|
env.execute();
|
||||||
List<FileChunk> fileChunks = CollectSink.values;
|
List<FileChunk> fileChunks = CollectSink.values;
|
||||||
Assert.assertTrue("合并错误,sink输出错误", fileChunks.size() > 0);
|
Assert.assertFalse("合并错误,sink输出错误", fileChunks.isEmpty());
|
||||||
categorizeChunks(fileChunks);
|
categorizeChunks(fileChunks);
|
||||||
byte[] data = new byte[0];
|
byte[] data = new byte[0];
|
||||||
long length = 0;
|
long length = 0;
|
||||||
@@ -531,7 +542,7 @@ public class FileChunkCombinerTests {
|
|||||||
StreamExecutionEnvironment env = createPipeline(2, new ByteDataSource(messagePackList, 500, windowTime), windowTime, 1);
|
StreamExecutionEnvironment env = createPipeline(2, new ByteDataSource(messagePackList, 500, windowTime), windowTime, 1);
|
||||||
env.execute();
|
env.execute();
|
||||||
List<FileChunk> fileChunks = CollectSink.values;
|
List<FileChunk> fileChunks = CollectSink.values;
|
||||||
Assert.assertTrue("合并错误,sink输出错误", fileChunks.size() > 0);
|
Assert.assertFalse("合并错误,sink输出错误", fileChunks.isEmpty());
|
||||||
categorizeChunks(fileChunks);
|
categorizeChunks(fileChunks);
|
||||||
byte[] data = new byte[0];
|
byte[] data = new byte[0];
|
||||||
long length = 0;
|
long length = 0;
|
||||||
@@ -619,44 +630,6 @@ public class FileChunkCombinerTests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testMock() throws Exception {
|
|
||||||
ProcessWindowFunctionMock mock = Mockito.mock(ProcessWindowFunctionMock.class);
|
|
||||||
InternalIterableProcessWindowFunction<FileChunk, FileChunk, String, TimeWindow> windowFunction = new InternalIterableProcessWindowFunction<>(mock);
|
|
||||||
TypeInformation<FileChunk> fileChunkType = PojoTypeInfo.of(FileChunk.class);
|
|
||||||
ExecutionConfig execConf = new ExecutionConfig();
|
|
||||||
execConf.setParallelism(5);
|
|
||||||
StreamingFunctionUtils.setOutputType(windowFunction, fileChunkType, execConf);
|
|
||||||
Mockito.verify(mock).setOutputType(fileChunkType, execConf);
|
|
||||||
Configuration config = new Configuration();
|
|
||||||
windowFunction.open(config);
|
|
||||||
Mockito.verify(mock).open(config);
|
|
||||||
RuntimeContext rCtx = Mockito.mock(RuntimeContext.class);
|
|
||||||
windowFunction.setRuntimeContext(rCtx);
|
|
||||||
(Mockito.verify(mock)).setRuntimeContext(rCtx);
|
|
||||||
TimeWindow w = Mockito.mock(TimeWindow.class);
|
|
||||||
Iterable<FileChunk> i = Mockito.mock(Iterable.class);
|
|
||||||
Collector<FileChunk> c = Mockito.mock(Collector.class);
|
|
||||||
InternalWindowFunction.InternalWindowContext ctx = Mockito.mock(InternalWindowFunction.InternalWindowContext.class);
|
|
||||||
(Mockito.doAnswer(new Answer() {
|
|
||||||
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
|
|
||||||
ProcessWindowFunction<FileChunk, FileChunk, String, TimeWindow>.Context c = (ProcessWindowFunction.Context) invocationOnMock.getArguments()[1];
|
|
||||||
c.currentProcessingTime();
|
|
||||||
c.currentWatermark();
|
|
||||||
c.windowState();
|
|
||||||
c.globalState();
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}).when(mock)).process(Mockito.anyString(), Mockito.anyObject(), Mockito.eq(i), Mockito.eq(c));
|
|
||||||
windowFunction.process("", w, ctx, i, c);
|
|
||||||
Mockito.verify(ctx).currentProcessingTime();
|
|
||||||
Mockito.verify(ctx).currentWatermark();
|
|
||||||
Mockito.verify(ctx).windowState();
|
|
||||||
Mockito.verify(ctx).globalState();
|
|
||||||
windowFunction.close();
|
|
||||||
Mockito.verify(mock).close();
|
|
||||||
}
|
|
||||||
|
|
||||||
private static class ProcessWindowFunctionMock extends ProcessWindowFunction<FileChunk, FileChunk, String, TimeWindow> implements OutputTypeConfigurable<FileChunk> {
|
private static class ProcessWindowFunctionMock extends ProcessWindowFunction<FileChunk, FileChunk, String, TimeWindow> implements OutputTypeConfigurable<FileChunk> {
|
||||||
private static final long serialVersionUID = 1L;
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
@@ -682,8 +655,8 @@ public class FileChunkCombinerTests {
|
|||||||
triggers.add(LastChunkOrNoDataInTimeTrigger.of(windowIdleTime * 1000));
|
triggers.add(LastChunkOrNoDataInTimeTrigger.of(windowIdleTime * 1000));
|
||||||
Trigger<Object, TimeWindow> trigger = MultipleTrigger.of(triggers);
|
Trigger<Object, TimeWindow> trigger = MultipleTrigger.of(triggers);
|
||||||
env.addSource(source)
|
env.addSource(source)
|
||||||
.map(new ParseMessagePackMapFunction(false, Long.MAX_VALUE, ""))
|
.map(new ParseMessagePackMapFunction())
|
||||||
.filter(new FileChunkFilterFunction(Long.MAX_VALUE, ""))
|
.filter(new FileChunkFilterFunction("","test"))
|
||||||
.assignTimestampsAndWatermarks(watermarkStrategy)
|
.assignTimestampsAndWatermarks(watermarkStrategy)
|
||||||
.keyBy(new FileChunkKeySelector(), BasicTypeInfo.STRING_TYPE_INFO)
|
.keyBy(new FileChunkKeySelector(), BasicTypeInfo.STRING_TYPE_INFO)
|
||||||
.window(TumblingEventTimeWindows.of(Time.seconds(windowTime)))
|
.window(TumblingEventTimeWindows.of(Time.seconds(windowTime)))
|
||||||
@@ -705,6 +678,44 @@ public class FileChunkCombinerTests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// @Test
|
||||||
|
// public void testMock() throws Exception {
|
||||||
|
// ProcessWindowFunctionMock mock = Mockito.mock(ProcessWindowFunctionMock.class);
|
||||||
|
// InternalIterableProcessWindowFunction<FileChunk, FileChunk, String, TimeWindow> windowFunction = new InternalIterableProcessWindowFunction<>(mock);
|
||||||
|
// TypeInformation<FileChunk> fileChunkType = PojoTypeInfo.of(FileChunk.class);
|
||||||
|
// ExecutionConfig execConf = new ExecutionConfig();
|
||||||
|
// execConf.setParallelism(5);
|
||||||
|
// StreamingFunctionUtils.setOutputType(windowFunction, fileChunkType, execConf);
|
||||||
|
// Mockito.verify(mock).setOutputType(fileChunkType, execConf);
|
||||||
|
// Configuration config = new Configuration();
|
||||||
|
// windowFunction.open(config);
|
||||||
|
// Mockito.verify(mock).open(config);
|
||||||
|
// RuntimeContext rCtx = Mockito.mock(RuntimeContext.class);
|
||||||
|
// windowFunction.setRuntimeContext(rCtx);
|
||||||
|
// (Mockito.verify(mock)).setRuntimeContext(rCtx);
|
||||||
|
// TimeWindow w = Mockito.mock(TimeWindow.class);
|
||||||
|
// Iterable<FileChunk> i = Mockito.mock(Iterable.class);
|
||||||
|
// Collector<FileChunk> c = Mockito.mock(Collector.class);
|
||||||
|
// InternalWindowFunction.InternalWindowContext ctx = Mockito.mock(InternalWindowFunction.InternalWindowContext.class);
|
||||||
|
// (Mockito.doAnswer(new Answer() {
|
||||||
|
// public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
|
||||||
|
// ProcessWindowFunction<FileChunk, FileChunk, String, TimeWindow>.Context c = (ProcessWindowFunction.Context) invocationOnMock.getArguments()[1];
|
||||||
|
// c.currentProcessingTime();
|
||||||
|
// c.currentWatermark();
|
||||||
|
// c.windowState();
|
||||||
|
// c.globalState();
|
||||||
|
// return null;
|
||||||
|
// }
|
||||||
|
// }).when(mock)).process(Mockito.anyString(), Mockito.anyObject(), Mockito.eq(i), Mockito.eq(c));
|
||||||
|
// windowFunction.process("", w, ctx, i, c);
|
||||||
|
// Mockito.verify(ctx).currentProcessingTime();
|
||||||
|
// Mockito.verify(ctx).currentWatermark();
|
||||||
|
// Mockito.verify(ctx).windowState();
|
||||||
|
// Mockito.verify(ctx).globalState();
|
||||||
|
// windowFunction.close();
|
||||||
|
// Mockito.verify(mock).close();
|
||||||
|
// }
|
||||||
|
|
||||||
// @Test
|
// @Test
|
||||||
// public void testCombineChunkProcessWindowFunction() throws Exception {
|
// public void testCombineChunkProcessWindowFunction() throws Exception {
|
||||||
// List<Trigger<Object, TimeWindow>> triggers = new ArrayList<>();
|
// List<Trigger<Object, TimeWindow>> triggers = new ArrayList<>();
|
||||||
|
|||||||
Reference in New Issue
Block a user