diff --git a/src/main/java/com/zdjizhi/FileChunkCombiner.java b/src/main/java/com/zdjizhi/FileChunkCombiner.java index 1c5fc08..5248c90 100644 --- a/src/main/java/com/zdjizhi/FileChunkCombiner.java +++ b/src/main/java/com/zdjizhi/FileChunkCombiner.java @@ -42,7 +42,7 @@ public class FileChunkCombiner { .name("Kafka Source") .map(new ParseMessagePackMapFunction()) .name("Map: Parse Message Pack") - .filter(new FileChunkFilter(configuration.getLong(Configs.FILE_MAX_SIZE), configuration.getString(Configs.FILTER_EXPRESSION))) + .filter(new FileChunkFilterFunction(configuration.getLong(Configs.FILE_MAX_SIZE), configuration.getString(Configs.FILTER_EXPRESSION))) .assignTimestampsAndWatermarks(watermarkStrategy); OutputTag delayedChunkOutputTag = new OutputTag("delayed-chunk") { @@ -82,5 +82,4 @@ public class FileChunkCombiner { environment.execute(configuration.get(Configs.FLINK_JOB_NAME)); } - -} \ No newline at end of file +} diff --git a/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java b/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java index 5eb134d..46f9eaa 100644 --- a/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java +++ b/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java @@ -15,9 +15,9 @@ import java.util.*; public class CombineChunkProcessWindowFunction extends ProcessWindowFunction { private transient Counter duplicateChunkCounter; - private transient Counter combineErrorCounter; - private transient Counter seekChunkCounter; - private transient Counter appendChunkCounter; + public transient Counter combineErrorCounter; + public transient Counter seekChunkCounter; + public transient Counter appendChunkCounter; private final Configuration configuration; public CombineChunkProcessWindowFunction(Configuration configuration) { @@ -41,4 +41,4 @@ public class CombineChunkProcessWindowFunction extends ProcessWindowFunction { +public class FileChunkFilterFunction extends RichFilterFunction { private final long maxFileSize; private final String filterExpression; private transient Counter filterChunkCounter; private JexlExpression jexlExpression; private JexlContext jexlContext; - public FileChunkFilter(long maxFileSize, String filterExpression) { + public FileChunkFilterFunction(long maxFileSize, String filterExpression) { this.maxFileSize = maxFileSize; this.filterExpression = filterExpression; } diff --git a/src/main/java/com/zdjizhi/pojo/FileChunk.java b/src/main/java/com/zdjizhi/pojo/FileChunk.java index 99076f8..4fa14d9 100644 --- a/src/main/java/com/zdjizhi/pojo/FileChunk.java +++ b/src/main/java/com/zdjizhi/pojo/FileChunk.java @@ -22,6 +22,30 @@ public class FileChunk implements Serializable { public FileChunk() { } + public FileChunk(String uuid, String fileType, long length, byte[] chunk, String combineMode, int chunkCount, long timestamp, Map meta, String chunkNumbers) { + this.uuid = uuid; + this.fileType = fileType; + this.length = length; + this.chunk = chunk; + this.combineMode = combineMode; + this.chunkCount = chunkCount; + this.timestamp = timestamp; + this.meta = meta; + this.chunkNumbers = chunkNumbers; + } + + public FileChunk(String uuid, String fileType, long offset, long length, byte[] chunk, String combineMode, int lastChunkFlag, int chunkCount, long timestamp) { + this.uuid = uuid; + this.fileType = fileType; + this.offset = offset; + this.length = length; + this.chunk = chunk; + this.combineMode = combineMode; + this.lastChunkFlag = lastChunkFlag; + this.chunkCount = chunkCount; + this.timestamp = timestamp; + } + public String getChunkNumbers() { return chunkNumbers; } diff --git a/src/main/java/com/zdjizhi/sink/HBaseSink.java b/src/main/java/com/zdjizhi/sink/HBaseSink.java index 5fed4f4..bf6a494 100644 --- a/src/main/java/com/zdjizhi/sink/HBaseSink.java +++ b/src/main/java/com/zdjizhi/sink/HBaseSink.java @@ -28,10 +28,10 @@ public class HBaseSink extends RichSinkFunction { private static final Log LOG = LogFactory.get(); private final Configuration configuration; - private transient Counter sendHBaseCounter; - private transient Counter sendHBaseErrorCounter; - private transient Counter sendHBaseFileCounter; - private transient Counter sendHBaseChunkCounter; + public transient Counter sendHBaseCounter; + public transient Counter sendHBaseErrorCounter; + public transient Counter sendHBaseFileCounter; + public transient Counter sendHBaseChunkCounter; private boolean isAsync; private Connection syncHBaseConnection; private AsyncConnection AsyncHBaseConnection; @@ -165,7 +165,7 @@ public class HBaseSink extends RichSinkFunction { } catch (IOException | InterruptedException e) { LOG.error("put chunk to hbase data table error. ", e.getMessage()); sendHBaseErrorCounter.inc(); - }finally { + } finally { dataPutList.clear(); } } @@ -176,7 +176,7 @@ public class HBaseSink extends RichSinkFunction { } catch (IOException | InterruptedException e) { LOG.error("put chunk to hbase index time table error. ", e.getMessage()); sendHBaseErrorCounter.inc(); - }finally { + } finally { indexTimePutList.clear(); } } @@ -187,7 +187,7 @@ public class HBaseSink extends RichSinkFunction { } catch (IOException | InterruptedException e) { LOG.error("put chunk to hbase index filename table error. ", e.getMessage()); sendHBaseErrorCounter.inc(); - }finally { + } finally { indexFilenamePutList.clear(); } } diff --git a/src/main/java/com/zdjizhi/sink/HosSink.java b/src/main/java/com/zdjizhi/sink/HosSink.java index dd186f4..03e5f46 100644 --- a/src/main/java/com/zdjizhi/sink/HosSink.java +++ b/src/main/java/com/zdjizhi/sink/HosSink.java @@ -35,10 +35,10 @@ public class HosSink extends RichSinkFunction { private static final Log LOG = LogFactory.get(); private final Configuration configuration; - private transient Counter sendHosCounter; - private transient Counter sendHosErrorCounter; - private transient Counter sendHosFileCounter; - private transient Counter sendHosChunkCounter; + public transient Counter sendHosCounter; + public transient Counter sendHosErrorCounter; + public transient Counter sendHosFileCounter; + public transient Counter sendHosChunkCounter; private boolean isAsync; private CloseableHttpClient syncHttpClient; private CloseableHttpAsyncClient asyncHttpClient; diff --git a/src/main/java/com/zdjizhi/trigger/LastChunkOrNoDataInTimeTrigger.java b/src/main/java/com/zdjizhi/trigger/LastChunkOrNoDataInTimeTrigger.java index 2605c88..b474cef 100644 --- a/src/main/java/com/zdjizhi/trigger/LastChunkOrNoDataInTimeTrigger.java +++ b/src/main/java/com/zdjizhi/trigger/LastChunkOrNoDataInTimeTrigger.java @@ -69,4 +69,4 @@ public class LastChunkOrNoDataInTimeTrigger extends Trigge return Math.max(value1, value2); } } -} \ No newline at end of file +} diff --git a/src/main/java/com/zdjizhi/trigger/MultipleTrigger.java b/src/main/java/com/zdjizhi/trigger/MultipleTrigger.java index 22eefe8..08665cc 100644 --- a/src/main/java/com/zdjizhi/trigger/MultipleTrigger.java +++ b/src/main/java/com/zdjizhi/trigger/MultipleTrigger.java @@ -64,4 +64,4 @@ public class MultipleTrigger extends Trigger { trigger.clear(window, ctx); } } -} \ No newline at end of file +} diff --git a/src/main/java/com/zdjizhi/utils/PublicUtil.java b/src/main/java/com/zdjizhi/utils/PublicUtil.java index 6074954..02f35b8 100644 --- a/src/main/java/com/zdjizhi/utils/PublicUtil.java +++ b/src/main/java/com/zdjizhi/utils/PublicUtil.java @@ -1,7 +1,6 @@ package com.zdjizhi.utils; import cn.hutool.core.util.ArrayUtil; -import cn.hutool.core.util.StrUtil; import cn.hutool.crypto.digest.DigestUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; @@ -21,7 +20,6 @@ public class PublicUtil { List combinedFileChunkList = new ArrayList<>(); try { List originalFileChunkList = StreamSupport.stream(input.spliterator(), false).collect(Collectors.toList()); - System.out.println(originalFileChunkList); List waitingToCombineChunkList = new ArrayList<>(); if (COMBINE_MODE_SEEK.equals(originalFileChunkList.get(0).getCombineMode())) { seekChunkCounter.inc(); diff --git a/src/test/java/com/zdjizhi/FileChunkCombinerTests.java b/src/test/java/com/zdjizhi/FileChunkCombinerTests.java index 67fab1e..26f5c7b 100644 --- a/src/test/java/com/zdjizhi/FileChunkCombinerTests.java +++ b/src/test/java/com/zdjizhi/FileChunkCombinerTests.java @@ -5,9 +5,12 @@ import cn.hutool.core.util.ArrayUtil; import cn.hutool.core.util.RandomUtil; import com.zdjizhi.config.Configs; import com.zdjizhi.function.CombineChunkProcessWindowFunction; +import com.zdjizhi.function.FileChunkFilterFunction; import com.zdjizhi.function.FileChunkKeySelector; import com.zdjizhi.function.ParseMessagePackMapFunction; import com.zdjizhi.pojo.FileChunk; +import com.zdjizhi.sink.HBaseSink; +import com.zdjizhi.sink.HosSink; import com.zdjizhi.trigger.LastChunkOrNoDataInTimeTrigger; import com.zdjizhi.trigger.MultipleTrigger; import com.zdjizhi.utils.PublicUtil; @@ -30,12 +33,10 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; -import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; +import org.apache.flink.streaming.api.operators.*; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; -import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; -import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator; @@ -43,6 +44,7 @@ import org.apache.flink.streaming.runtime.operators.windowing.functions.Internal import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.TestHarnessUtil; import org.apache.flink.streaming.util.functions.StreamingFunctionUtils; import org.apache.flink.test.util.MiniClusterWithClientResource; @@ -62,7 +64,6 @@ public class FileChunkCombinerTests { private static Counter combineErrorCounter; private static Counter seekChunkCounter; private static Counter appendChunkCounter; - private static Counter sendHosErrorCounter; private File emlFile; private byte[] emlFileBytes; private byte[] pcapngFileBytes; @@ -90,8 +91,7 @@ public class FileChunkCombinerTests { combineErrorCounter = new SimpleCounter(); seekChunkCounter = new SimpleCounter(); appendChunkCounter = new SimpleCounter(); - sendHosErrorCounter = new SimpleCounter(); - maxChunkCount = configuration.get(Configs.COMBINER_WINDOW_KEY_MAX_CHUNK); + maxChunkCount = configuration.get(Configs.FILE_MAX_CHUNK_COUNT); String filePath = "src" + File.separator + "test" + File.separator + "data" + File.separator + "test.eml"; emlFile = new File(filePath); emlFileBytes = FileUtil.readBytes(emlFile); @@ -118,14 +118,188 @@ public class FileChunkCombinerTests { } @Test - public void testParseMessagePack() { - ParseMessagePackMapFunction mapFunction = new ParseMessagePackMapFunction(); + public void testParseMessagePackMapFunction() throws Exception { + OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness<>(new StreamMap<>(new ParseMessagePackMapFunction())); + testHarness.setup(); + testHarness.open(); for (byte[] messagePack : messagePackList) { - FileChunk fileChunk = mapFunction.map(messagePack); + testHarness.processElement(new StreamRecord<>(messagePack)); + } + ConcurrentLinkedQueue output = testHarness.getOutput(); + Assert.assertEquals(30, output.size()); + for (Object o : output) { + FileChunk fileChunk = ((StreamRecord) o).getValue(); Assert.assertNotEquals("解析MessagePack数据失败", null, fileChunk.getUuid()); } } + @Test + public void testFileChunkFilterFunction() throws Exception { + StreamFilter fileChunkStreamFilter = new StreamFilter<>(new FileChunkFilterFunction(100000, "FileChunk.fileType == \"eml\"")); + OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness<>(fileChunkStreamFilter); + testHarness.setup(); + testHarness.open(); + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); + for (FileChunk fileChunk : inputFileChunks) { + testHarness.processElement(new StreamRecord<>(fileChunk)); + if ("eml".equals(fileChunk.getFileType())) { + expectedOutput.add(new StreamRecord<>(fileChunk)); + } + } + ConcurrentLinkedQueue actualOutput = testHarness.getOutput(); + Assert.assertEquals(10, actualOutput.size()); + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, actualOutput, new Comparator() { + @Override + public int compare(Object o1, Object o2) { + StreamRecord sr0 = (StreamRecord) o1; + StreamRecord sr1 = (StreamRecord) o2; + return Long.compare(((FileChunk) sr0.getValue()).getOffset(), ((FileChunk) sr1.getValue()).getOffset()); + } + }); + testHarness.close(); + } + + @Test + public void testCombineChunkProcessWindowFunction() throws Exception { + //seek模式 + ListStateDescriptor listStateDescriptor = new ListStateDescriptor("test-seek-window", new ListSerializer(new JavaSerializer())); + List> triggers = new ArrayList<>(); + triggers.add(EventTimeTrigger.create()); + triggers.add(LastChunkOrNoDataInTimeTrigger.of(1000)); + Trigger trigger = MultipleTrigger.of(triggers); + CombineChunkProcessWindowFunction processWindowFunction = new CombineChunkProcessWindowFunction(configuration); + WindowOperator operator = new WindowOperator( + TumblingEventTimeWindows.of(Time.seconds(3)), + new TimeWindow.Serializer(), + new FileChunkKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + listStateDescriptor, + new InternalIterableProcessWindowFunction(processWindowFunction), + trigger, + 0L, null); + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); + expectedOutput.add(new StreamRecord<>(PublicUtil.combine(inputFileChunks.subList(0, 10), maxChunkCount, duplicateChunkCounter, combineErrorCounter, seekChunkCounter, appendChunkCounter).get(0), 2999L)); + KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new FileChunkKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + testHarness.setup(); + testHarness.open(); + for (FileChunk fileChunk : inputFileChunks.subList(0, 10)) { + testHarness.processElement(fileChunk, 1000L); + } + Assert.assertEquals(10, processWindowFunction.seekChunkCounter.getCount()); + Assert.assertEquals(0, processWindowFunction.combineErrorCounter.getCount()); + ConcurrentLinkedQueue actualOutput = testHarness.getOutput(); + Assert.assertEquals(1, actualOutput.size()); + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, actualOutput, (o1, o2) -> { + StreamRecord sr0 = (StreamRecord) o1; + StreamRecord sr1 = (StreamRecord) o2; + return Long.compare(((FileChunk) sr0.getValue()).getOffset(), ((FileChunk) sr1.getValue()).getOffset()); + }); + testHarness.close(); + //append模式 + triggers = new ArrayList<>(); + triggers.add(EventTimeTrigger.create()); + triggers.add(LastChunkOrNoDataInTimeTrigger.of(1000)); + trigger = MultipleTrigger.of(triggers); + listStateDescriptor = new ListStateDescriptor("test-append-window", new ListSerializer(new JavaSerializer())); + processWindowFunction = new CombineChunkProcessWindowFunction(configuration); + operator = new WindowOperator( + TumblingEventTimeWindows.of(Time.seconds(3)), + new TimeWindow.Serializer(), + new FileChunkKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + listStateDescriptor, + new InternalIterableProcessWindowFunction(processWindowFunction), + trigger, + 0L, null); + expectedOutput = new ConcurrentLinkedQueue<>(); + expectedOutput.add(new StreamRecord<>(PublicUtil.combine(inputFileChunks.subList(10, 20), maxChunkCount, duplicateChunkCounter, combineErrorCounter, seekChunkCounter, appendChunkCounter).get(0), 2999L)); + expectedOutput.add(new StreamRecord<>(PublicUtil.combine(inputFileChunks.subList(20, inputFileChunks.size()), maxChunkCount, duplicateChunkCounter, combineErrorCounter, seekChunkCounter, appendChunkCounter).get(0), 2999L)); + testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new FileChunkKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + testHarness.setup(); + testHarness.open(); + for (FileChunk fileChunk : inputFileChunks.subList(10, inputFileChunks.size())) { + testHarness.processElement(fileChunk, 1000L); + } + testHarness.setProcessingTime(5000L); + Assert.assertEquals(20, processWindowFunction.appendChunkCounter.getCount()); + Assert.assertEquals(0, processWindowFunction.combineErrorCounter.getCount()); + actualOutput = testHarness.getOutput(); + Assert.assertEquals(2, actualOutput.size()); + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, actualOutput, (o1, o2) -> { + StreamRecord sr0 = (StreamRecord) o1; + StreamRecord sr1 = (StreamRecord) o2; + return ((FileChunk) sr0.getValue()).getUuid().compareTo(((FileChunk) sr1.getValue()).getUuid()); + }); + testHarness.close(); + } + + @Test + public void testHosSink() throws Exception { + //测试单条上传 + configuration.setString(Configs.SINK_TYPE, "hos"); + configuration.setBoolean(Configs.SINK_BATCH, false); + HosSink hosSink = new HosSink(configuration); + StreamSink fileChunkStreamSink = new StreamSink<>(hosSink); + OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness<>(fileChunkStreamSink); + testHarness.setup(); + testHarness.open(); + byte[] data = RandomUtil.randomString(1000).getBytes(); + //seek模式 + FileChunk fileChunk = new FileChunk("0000000001", "eml", 0, data.length, data, "seek", 1, 5, System.currentTimeMillis()); + testHarness.processElement(new StreamRecord<>(fileChunk)); + Assert.assertEquals("上传文件到hos错误", 0, hosSink.sendHosErrorCounter.getCount()); + Assert.assertEquals("上传文件到hos失败", 1, hosSink.sendHosCounter.getCount()); + Assert.assertEquals(1, hosSink.sendHosFileCounter.getCount()); + Assert.assertEquals(1, hosSink.sendHosChunkCounter.getCount()); + //append模式 + fileChunk = new FileChunk("0000000002", "pcapng", data.length, data, "append", 5, System.currentTimeMillis(), pcapngFileMeta, "1-200,2-200,3-200,4-200,5-200"); + testHarness.processElement(new StreamRecord<>(fileChunk)); + Assert.assertEquals("上传文件到hos错误", 0, hosSink.sendHosErrorCounter.getCount()); + Assert.assertEquals("上传文件到hos次数错误", 2, hosSink.sendHosCounter.getCount()); + Assert.assertEquals(2, hosSink.sendHosChunkCounter.getCount()); + testHarness.close(); + //测试批量上传 + configuration.setString(Configs.SINK_TYPE, "hos"); + configuration.setBoolean(Configs.SINK_BATCH, true); + configuration.setInteger(Configs.SINK_BATCH_COUNT, 2); + hosSink = new HosSink(configuration); + fileChunkStreamSink = new StreamSink<>(hosSink); + testHarness = new OneInputStreamOperatorTestHarness<>(fileChunkStreamSink); + testHarness.setup(); + testHarness.open(); + fileChunk = new FileChunk("0000000001", "eml", 0, data.length, data, "seek", 1, 5, System.currentTimeMillis()); + testHarness.processElement(new StreamRecord<>(fileChunk)); + fileChunk = new FileChunk("0000000002", "eml", 0, data.length, data, "seek", 1, 5, System.currentTimeMillis()); + testHarness.processElement(new StreamRecord<>(fileChunk)); + Assert.assertEquals("上传文件到hos错误", 0, hosSink.sendHosErrorCounter.getCount()); + Assert.assertEquals("上传文件到hos失败", 1, hosSink.sendHosCounter.getCount()); + Assert.assertEquals(2, hosSink.sendHosFileCounter.getCount()); + Assert.assertEquals(2, hosSink.sendHosChunkCounter.getCount()); + testHarness.close(); + } + + @Test + public void testHBaseSink() throws Exception { + configuration.setString(Configs.SINK_TYPE, "hbase"); + configuration.setBoolean(Configs.SINK_BATCH, true); + configuration.setInteger(Configs.SINK_BATCH_COUNT, 2); + HBaseSink hBaseSink = new HBaseSink(configuration); + StreamSink fileChunkStreamSink = new StreamSink<>(hBaseSink); + OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness<>(fileChunkStreamSink); + testHarness.setup(); + testHarness.open(); + byte[] data = RandomUtil.randomString(1000).getBytes(); + FileChunk fileChunk = new FileChunk("0000000001", "eml", 0, data.length, data, "seek", 1, 5, System.currentTimeMillis()); + testHarness.processElement(new StreamRecord<>(fileChunk)); + fileChunk = new FileChunk("0000000002", "eml", 0, data.length, data, "seek", 1, 5, System.currentTimeMillis()); + testHarness.processElement(new StreamRecord<>(fileChunk)); + Assert.assertEquals("上传文件到hbase错误", 0, hBaseSink.sendHBaseErrorCounter.getCount()); + Assert.assertEquals("上传文件到hbase次数错误", 3, hBaseSink.sendHBaseCounter.getCount()); + Assert.assertEquals(2, hBaseSink.sendHBaseFileCounter.getCount()); + Assert.assertEquals(2, hBaseSink.sendHBaseChunkCounter.getCount()); + testHarness.close(); + } + @Test public void testCombineFullChunk() { categorizeChunks(inputFileChunks); @@ -212,37 +386,6 @@ public class FileChunkCombinerTests { Assert.assertEquals("监控指标错误", pcapngChunkCount * 2 - 2, appendChunkCounter.getCount()); } - @Test - public void testSendToHos() { - byte[] data = RandomUtil.randomString(1000).getBytes(); - //seek模式 - FileChunk fileChunk = new FileChunk(); - fileChunk.setUuid("0000000001"); - fileChunk.setCombineMode("seek"); - fileChunk.setFileType("eml"); - fileChunk.setOffset(0); - fileChunk.setLength(data.length); - fileChunk.setLastChunkFlag(1); - fileChunk.setChunkCount(5); - fileChunk.setTimestamp(System.currentTimeMillis()); - fileChunk.setChunk(data); - PublicUtil.sendToHos(fileChunk, configuration, sendHosErrorCounter); - Assert.assertEquals("上传文件到hos错误", 0, sendHosErrorCounter.getCount()); - //append模式 - fileChunk = new FileChunk(); - fileChunk.setUuid("0000000002"); - fileChunk.setCombineMode("append"); - fileChunk.setFileType("pcapng"); - fileChunk.setLength(data.length); - fileChunk.setChunkCount(5); - fileChunk.setTimestamp(System.currentTimeMillis()); - fileChunk.setChunk(data); - fileChunk.setChunkNumbers("1-200,2-200,3-200,4-200,5-200"); - fileChunk.setMeta(pcapngFileMeta); - PublicUtil.sendToHos(fileChunk, configuration, sendHosErrorCounter); - Assert.assertEquals("上传文件到hos错误", 0, sendHosErrorCounter.getCount()); - } - @Test public void testPipelineFullChunk() throws Exception { CollectSink.values.clear(); @@ -444,40 +587,6 @@ public class FileChunkCombinerTests { } } - @Test - public void testCombineChunkProcessWindowFunction() throws Exception { - ListStateDescriptor listStateDescriptor = new ListStateDescriptor("test-window", new ListSerializer(new JavaSerializer())); - WindowOperator operator = new WindowOperator( - TumblingProcessingTimeWindows.of(Time.seconds(3)), - new TimeWindow.Serializer(), - new FileChunkKeySelector(), - BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), - listStateDescriptor, - new InternalIterableProcessWindowFunction(new CombineChunkProcessWindowFunction(configuration)), - ProcessingTimeTrigger.create(), - 0L, null); - KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, new FileChunkKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); - ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); - testHarness.open(); - testHarness.setProcessingTime(3L); - testHarness.processElement(new StreamRecord<>(inputFileChunks.get(0))); - testHarness.processElement(new StreamRecord<>(inputFileChunks.get(1))); - testHarness.processElement(new StreamRecord<>(inputFileChunks.get(2))); - testHarness.processElement(new StreamRecord<>(inputFileChunks.get(3))); - testHarness.processElement(new StreamRecord<>(inputFileChunks.get(4))); - testHarness.setProcessingTime(5000L); - expectedOutput.add(new StreamRecord<>(PublicUtil.combine(inputFileChunks.subList(0, 5), maxChunkCount, duplicateChunkCounter, combineErrorCounter, seekChunkCounter, appendChunkCounter).get(0), 2999L)); - ConcurrentLinkedQueue actualOutput = testHarness.getOutput(); - TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, actualOutput, new Comparator() { - @Override - public int compare(Object o1, Object o2) { - StreamRecord sr0 = (StreamRecord) o1; - StreamRecord sr1 = (StreamRecord) o2; - return Long.compare(((FileChunk) sr0.getValue()).getOffset(), ((FileChunk) sr1.getValue()).getOffset()); - } - }); - } - @Test public void testMock() throws Exception { ProcessWindowFunctionMock mock = Mockito.mock(ProcessWindowFunctionMock.class); @@ -563,5 +672,4 @@ public class FileChunkCombinerTests { } } } - }