优化代码,完善单元测试

This commit is contained in:
houjinchuan
2024-03-05 17:26:52 +08:00
parent 7795ebb318
commit 5be9f84f96
10 changed files with 228 additions and 99 deletions

View File

@@ -42,7 +42,7 @@ public class FileChunkCombiner {
.name("Kafka Source") .name("Kafka Source")
.map(new ParseMessagePackMapFunction()) .map(new ParseMessagePackMapFunction())
.name("Map: Parse Message Pack") .name("Map: Parse Message Pack")
.filter(new FileChunkFilter(configuration.getLong(Configs.FILE_MAX_SIZE), configuration.getString(Configs.FILTER_EXPRESSION))) .filter(new FileChunkFilterFunction(configuration.getLong(Configs.FILE_MAX_SIZE), configuration.getString(Configs.FILTER_EXPRESSION)))
.assignTimestampsAndWatermarks(watermarkStrategy); .assignTimestampsAndWatermarks(watermarkStrategy);
OutputTag<FileChunk> delayedChunkOutputTag = new OutputTag<FileChunk>("delayed-chunk") { OutputTag<FileChunk> delayedChunkOutputTag = new OutputTag<FileChunk>("delayed-chunk") {
@@ -82,5 +82,4 @@ public class FileChunkCombiner {
environment.execute(configuration.get(Configs.FLINK_JOB_NAME)); environment.execute(configuration.get(Configs.FLINK_JOB_NAME));
} }
}
}

View File

@@ -15,9 +15,9 @@ import java.util.*;
public class CombineChunkProcessWindowFunction extends ProcessWindowFunction<FileChunk, FileChunk, String, TimeWindow> { public class CombineChunkProcessWindowFunction extends ProcessWindowFunction<FileChunk, FileChunk, String, TimeWindow> {
private transient Counter duplicateChunkCounter; private transient Counter duplicateChunkCounter;
private transient Counter combineErrorCounter; public transient Counter combineErrorCounter;
private transient Counter seekChunkCounter; public transient Counter seekChunkCounter;
private transient Counter appendChunkCounter; public transient Counter appendChunkCounter;
private final Configuration configuration; private final Configuration configuration;
public CombineChunkProcessWindowFunction(Configuration configuration) { public CombineChunkProcessWindowFunction(Configuration configuration) {
@@ -41,4 +41,4 @@ public class CombineChunkProcessWindowFunction extends ProcessWindowFunction<Fil
out.collect(fileChunk); out.collect(fileChunk);
} }
} }
} }

View File

@@ -8,14 +8,14 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.MetricGroup;
public class FileChunkFilter extends RichFilterFunction<FileChunk> { public class FileChunkFilterFunction extends RichFilterFunction<FileChunk> {
private final long maxFileSize; private final long maxFileSize;
private final String filterExpression; private final String filterExpression;
private transient Counter filterChunkCounter; private transient Counter filterChunkCounter;
private JexlExpression jexlExpression; private JexlExpression jexlExpression;
private JexlContext jexlContext; private JexlContext jexlContext;
public FileChunkFilter(long maxFileSize, String filterExpression) { public FileChunkFilterFunction(long maxFileSize, String filterExpression) {
this.maxFileSize = maxFileSize; this.maxFileSize = maxFileSize;
this.filterExpression = filterExpression; this.filterExpression = filterExpression;
} }

View File

@@ -22,6 +22,30 @@ public class FileChunk implements Serializable {
public FileChunk() { public FileChunk() {
} }
public FileChunk(String uuid, String fileType, long length, byte[] chunk, String combineMode, int chunkCount, long timestamp, Map<String, Object> meta, String chunkNumbers) {
this.uuid = uuid;
this.fileType = fileType;
this.length = length;
this.chunk = chunk;
this.combineMode = combineMode;
this.chunkCount = chunkCount;
this.timestamp = timestamp;
this.meta = meta;
this.chunkNumbers = chunkNumbers;
}
public FileChunk(String uuid, String fileType, long offset, long length, byte[] chunk, String combineMode, int lastChunkFlag, int chunkCount, long timestamp) {
this.uuid = uuid;
this.fileType = fileType;
this.offset = offset;
this.length = length;
this.chunk = chunk;
this.combineMode = combineMode;
this.lastChunkFlag = lastChunkFlag;
this.chunkCount = chunkCount;
this.timestamp = timestamp;
}
public String getChunkNumbers() { public String getChunkNumbers() {
return chunkNumbers; return chunkNumbers;
} }

View File

@@ -28,10 +28,10 @@ public class HBaseSink extends RichSinkFunction<FileChunk> {
private static final Log LOG = LogFactory.get(); private static final Log LOG = LogFactory.get();
private final Configuration configuration; private final Configuration configuration;
private transient Counter sendHBaseCounter; public transient Counter sendHBaseCounter;
private transient Counter sendHBaseErrorCounter; public transient Counter sendHBaseErrorCounter;
private transient Counter sendHBaseFileCounter; public transient Counter sendHBaseFileCounter;
private transient Counter sendHBaseChunkCounter; public transient Counter sendHBaseChunkCounter;
private boolean isAsync; private boolean isAsync;
private Connection syncHBaseConnection; private Connection syncHBaseConnection;
private AsyncConnection AsyncHBaseConnection; private AsyncConnection AsyncHBaseConnection;
@@ -165,7 +165,7 @@ public class HBaseSink extends RichSinkFunction<FileChunk> {
} catch (IOException | InterruptedException e) { } catch (IOException | InterruptedException e) {
LOG.error("put chunk to hbase data table error. ", e.getMessage()); LOG.error("put chunk to hbase data table error. ", e.getMessage());
sendHBaseErrorCounter.inc(); sendHBaseErrorCounter.inc();
}finally { } finally {
dataPutList.clear(); dataPutList.clear();
} }
} }
@@ -176,7 +176,7 @@ public class HBaseSink extends RichSinkFunction<FileChunk> {
} catch (IOException | InterruptedException e) { } catch (IOException | InterruptedException e) {
LOG.error("put chunk to hbase index time table error. ", e.getMessage()); LOG.error("put chunk to hbase index time table error. ", e.getMessage());
sendHBaseErrorCounter.inc(); sendHBaseErrorCounter.inc();
}finally { } finally {
indexTimePutList.clear(); indexTimePutList.clear();
} }
} }
@@ -187,7 +187,7 @@ public class HBaseSink extends RichSinkFunction<FileChunk> {
} catch (IOException | InterruptedException e) { } catch (IOException | InterruptedException e) {
LOG.error("put chunk to hbase index filename table error. ", e.getMessage()); LOG.error("put chunk to hbase index filename table error. ", e.getMessage());
sendHBaseErrorCounter.inc(); sendHBaseErrorCounter.inc();
}finally { } finally {
indexFilenamePutList.clear(); indexFilenamePutList.clear();
} }
} }

View File

@@ -35,10 +35,10 @@ public class HosSink extends RichSinkFunction<FileChunk> {
private static final Log LOG = LogFactory.get(); private static final Log LOG = LogFactory.get();
private final Configuration configuration; private final Configuration configuration;
private transient Counter sendHosCounter; public transient Counter sendHosCounter;
private transient Counter sendHosErrorCounter; public transient Counter sendHosErrorCounter;
private transient Counter sendHosFileCounter; public transient Counter sendHosFileCounter;
private transient Counter sendHosChunkCounter; public transient Counter sendHosChunkCounter;
private boolean isAsync; private boolean isAsync;
private CloseableHttpClient syncHttpClient; private CloseableHttpClient syncHttpClient;
private CloseableHttpAsyncClient asyncHttpClient; private CloseableHttpAsyncClient asyncHttpClient;

View File

@@ -69,4 +69,4 @@ public class LastChunkOrNoDataInTimeTrigger<W extends TimeWindow> extends Trigge
return Math.max(value1, value2); return Math.max(value1, value2);
} }
} }
} }

View File

@@ -64,4 +64,4 @@ public class MultipleTrigger<T, W extends Window> extends Trigger<T, W> {
trigger.clear(window, ctx); trigger.clear(window, ctx);
} }
} }
} }

View File

@@ -1,7 +1,6 @@
package com.zdjizhi.utils; package com.zdjizhi.utils;
import cn.hutool.core.util.ArrayUtil; import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.crypto.digest.DigestUtil; import cn.hutool.crypto.digest.DigestUtil;
import cn.hutool.log.Log; import cn.hutool.log.Log;
import cn.hutool.log.LogFactory; import cn.hutool.log.LogFactory;
@@ -21,7 +20,6 @@ public class PublicUtil {
List<FileChunk> combinedFileChunkList = new ArrayList<>(); List<FileChunk> combinedFileChunkList = new ArrayList<>();
try { try {
List<FileChunk> originalFileChunkList = StreamSupport.stream(input.spliterator(), false).collect(Collectors.toList()); List<FileChunk> originalFileChunkList = StreamSupport.stream(input.spliterator(), false).collect(Collectors.toList());
System.out.println(originalFileChunkList);
List<byte[]> waitingToCombineChunkList = new ArrayList<>(); List<byte[]> waitingToCombineChunkList = new ArrayList<>();
if (COMBINE_MODE_SEEK.equals(originalFileChunkList.get(0).getCombineMode())) { if (COMBINE_MODE_SEEK.equals(originalFileChunkList.get(0).getCombineMode())) {
seekChunkCounter.inc(); seekChunkCounter.inc();

View File

@@ -5,9 +5,12 @@ import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.RandomUtil; import cn.hutool.core.util.RandomUtil;
import com.zdjizhi.config.Configs; import com.zdjizhi.config.Configs;
import com.zdjizhi.function.CombineChunkProcessWindowFunction; import com.zdjizhi.function.CombineChunkProcessWindowFunction;
import com.zdjizhi.function.FileChunkFilterFunction;
import com.zdjizhi.function.FileChunkKeySelector; import com.zdjizhi.function.FileChunkKeySelector;
import com.zdjizhi.function.ParseMessagePackMapFunction; import com.zdjizhi.function.ParseMessagePackMapFunction;
import com.zdjizhi.pojo.FileChunk; import com.zdjizhi.pojo.FileChunk;
import com.zdjizhi.sink.HBaseSink;
import com.zdjizhi.sink.HosSink;
import com.zdjizhi.trigger.LastChunkOrNoDataInTimeTrigger; import com.zdjizhi.trigger.LastChunkOrNoDataInTimeTrigger;
import com.zdjizhi.trigger.MultipleTrigger; import com.zdjizhi.trigger.MultipleTrigger;
import com.zdjizhi.utils.PublicUtil; import com.zdjizhi.utils.PublicUtil;
@@ -30,12 +33,10 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; import org.apache.flink.streaming.api.operators.*;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator; import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
@@ -43,6 +44,7 @@ import org.apache.flink.streaming.runtime.operators.windowing.functions.Internal
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil; import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.streaming.util.functions.StreamingFunctionUtils; import org.apache.flink.streaming.util.functions.StreamingFunctionUtils;
import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.test.util.MiniClusterWithClientResource;
@@ -62,7 +64,6 @@ public class FileChunkCombinerTests {
private static Counter combineErrorCounter; private static Counter combineErrorCounter;
private static Counter seekChunkCounter; private static Counter seekChunkCounter;
private static Counter appendChunkCounter; private static Counter appendChunkCounter;
private static Counter sendHosErrorCounter;
private File emlFile; private File emlFile;
private byte[] emlFileBytes; private byte[] emlFileBytes;
private byte[] pcapngFileBytes; private byte[] pcapngFileBytes;
@@ -90,8 +91,7 @@ public class FileChunkCombinerTests {
combineErrorCounter = new SimpleCounter(); combineErrorCounter = new SimpleCounter();
seekChunkCounter = new SimpleCounter(); seekChunkCounter = new SimpleCounter();
appendChunkCounter = new SimpleCounter(); appendChunkCounter = new SimpleCounter();
sendHosErrorCounter = new SimpleCounter(); maxChunkCount = configuration.get(Configs.FILE_MAX_CHUNK_COUNT);
maxChunkCount = configuration.get(Configs.COMBINER_WINDOW_KEY_MAX_CHUNK);
String filePath = "src" + File.separator + "test" + File.separator + "data" + File.separator + "test.eml"; String filePath = "src" + File.separator + "test" + File.separator + "data" + File.separator + "test.eml";
emlFile = new File(filePath); emlFile = new File(filePath);
emlFileBytes = FileUtil.readBytes(emlFile); emlFileBytes = FileUtil.readBytes(emlFile);
@@ -118,14 +118,188 @@ public class FileChunkCombinerTests {
} }
@Test @Test
public void testParseMessagePack() { public void testParseMessagePackMapFunction() throws Exception {
ParseMessagePackMapFunction mapFunction = new ParseMessagePackMapFunction(); OneInputStreamOperatorTestHarness<byte[], FileChunk> testHarness = new OneInputStreamOperatorTestHarness<>(new StreamMap<>(new ParseMessagePackMapFunction()));
testHarness.setup();
testHarness.open();
for (byte[] messagePack : messagePackList) { for (byte[] messagePack : messagePackList) {
FileChunk fileChunk = mapFunction.map(messagePack); testHarness.processElement(new StreamRecord<>(messagePack));
}
ConcurrentLinkedQueue<Object> output = testHarness.getOutput();
Assert.assertEquals(30, output.size());
for (Object o : output) {
FileChunk fileChunk = ((StreamRecord<FileChunk>) o).getValue();
Assert.assertNotEquals("解析MessagePack数据失败", null, fileChunk.getUuid()); Assert.assertNotEquals("解析MessagePack数据失败", null, fileChunk.getUuid());
} }
} }
@Test
public void testFileChunkFilterFunction() throws Exception {
StreamFilter<FileChunk> fileChunkStreamFilter = new StreamFilter<>(new FileChunkFilterFunction(100000, "FileChunk.fileType == \"eml\""));
OneInputStreamOperatorTestHarness<FileChunk, FileChunk> testHarness = new OneInputStreamOperatorTestHarness<>(fileChunkStreamFilter);
testHarness.setup();
testHarness.open();
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
for (FileChunk fileChunk : inputFileChunks) {
testHarness.processElement(new StreamRecord<>(fileChunk));
if ("eml".equals(fileChunk.getFileType())) {
expectedOutput.add(new StreamRecord<>(fileChunk));
}
}
ConcurrentLinkedQueue<Object> actualOutput = testHarness.getOutput();
Assert.assertEquals(10, actualOutput.size());
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, actualOutput, new Comparator<Object>() {
@Override
public int compare(Object o1, Object o2) {
StreamRecord sr0 = (StreamRecord) o1;
StreamRecord sr1 = (StreamRecord) o2;
return Long.compare(((FileChunk) sr0.getValue()).getOffset(), ((FileChunk) sr1.getValue()).getOffset());
}
});
testHarness.close();
}
@Test
public void testCombineChunkProcessWindowFunction() throws Exception {
//seek模式
ListStateDescriptor listStateDescriptor = new ListStateDescriptor<FileChunk>("test-seek-window", new ListSerializer(new JavaSerializer()));
List<Trigger<Object, TimeWindow>> triggers = new ArrayList<>();
triggers.add(EventTimeTrigger.create());
triggers.add(LastChunkOrNoDataInTimeTrigger.of(1000));
Trigger<Object, TimeWindow> trigger = MultipleTrigger.of(triggers);
CombineChunkProcessWindowFunction processWindowFunction = new CombineChunkProcessWindowFunction(configuration);
WindowOperator<String, FileChunk, FileChunk, FileChunk, TimeWindow> operator = new WindowOperator<String, FileChunk, FileChunk, FileChunk, TimeWindow>(
TumblingEventTimeWindows.of(Time.seconds(3)),
new TimeWindow.Serializer(),
new FileChunkKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
listStateDescriptor,
new InternalIterableProcessWindowFunction(processWindowFunction),
trigger,
0L, null);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
expectedOutput.add(new StreamRecord<>(PublicUtil.combine(inputFileChunks.subList(0, 10), maxChunkCount, duplicateChunkCounter, combineErrorCounter, seekChunkCounter, appendChunkCounter).get(0), 2999L));
KeyedOneInputStreamOperatorTestHarness<String, FileChunk, FileChunk> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new FileChunkKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
testHarness.setup();
testHarness.open();
for (FileChunk fileChunk : inputFileChunks.subList(0, 10)) {
testHarness.processElement(fileChunk, 1000L);
}
Assert.assertEquals(10, processWindowFunction.seekChunkCounter.getCount());
Assert.assertEquals(0, processWindowFunction.combineErrorCounter.getCount());
ConcurrentLinkedQueue<Object> actualOutput = testHarness.getOutput();
Assert.assertEquals(1, actualOutput.size());
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, actualOutput, (o1, o2) -> {
StreamRecord sr0 = (StreamRecord) o1;
StreamRecord sr1 = (StreamRecord) o2;
return Long.compare(((FileChunk) sr0.getValue()).getOffset(), ((FileChunk) sr1.getValue()).getOffset());
});
testHarness.close();
//append模式
triggers = new ArrayList<>();
triggers.add(EventTimeTrigger.create());
triggers.add(LastChunkOrNoDataInTimeTrigger.of(1000));
trigger = MultipleTrigger.of(triggers);
listStateDescriptor = new ListStateDescriptor<FileChunk>("test-append-window", new ListSerializer(new JavaSerializer()));
processWindowFunction = new CombineChunkProcessWindowFunction(configuration);
operator = new WindowOperator<String, FileChunk, FileChunk, FileChunk, TimeWindow>(
TumblingEventTimeWindows.of(Time.seconds(3)),
new TimeWindow.Serializer(),
new FileChunkKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
listStateDescriptor,
new InternalIterableProcessWindowFunction(processWindowFunction),
trigger,
0L, null);
expectedOutput = new ConcurrentLinkedQueue<>();
expectedOutput.add(new StreamRecord<>(PublicUtil.combine(inputFileChunks.subList(10, 20), maxChunkCount, duplicateChunkCounter, combineErrorCounter, seekChunkCounter, appendChunkCounter).get(0), 2999L));
expectedOutput.add(new StreamRecord<>(PublicUtil.combine(inputFileChunks.subList(20, inputFileChunks.size()), maxChunkCount, duplicateChunkCounter, combineErrorCounter, seekChunkCounter, appendChunkCounter).get(0), 2999L));
testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new FileChunkKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
testHarness.setup();
testHarness.open();
for (FileChunk fileChunk : inputFileChunks.subList(10, inputFileChunks.size())) {
testHarness.processElement(fileChunk, 1000L);
}
testHarness.setProcessingTime(5000L);
Assert.assertEquals(20, processWindowFunction.appendChunkCounter.getCount());
Assert.assertEquals(0, processWindowFunction.combineErrorCounter.getCount());
actualOutput = testHarness.getOutput();
Assert.assertEquals(2, actualOutput.size());
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, actualOutput, (o1, o2) -> {
StreamRecord sr0 = (StreamRecord) o1;
StreamRecord sr1 = (StreamRecord) o2;
return ((FileChunk) sr0.getValue()).getUuid().compareTo(((FileChunk) sr1.getValue()).getUuid());
});
testHarness.close();
}
@Test
public void testHosSink() throws Exception {
//测试单条上传
configuration.setString(Configs.SINK_TYPE, "hos");
configuration.setBoolean(Configs.SINK_BATCH, false);
HosSink hosSink = new HosSink(configuration);
StreamSink<FileChunk> fileChunkStreamSink = new StreamSink<>(hosSink);
OneInputStreamOperatorTestHarness<FileChunk, Object> testHarness = new OneInputStreamOperatorTestHarness<>(fileChunkStreamSink);
testHarness.setup();
testHarness.open();
byte[] data = RandomUtil.randomString(1000).getBytes();
//seek模式
FileChunk fileChunk = new FileChunk("0000000001", "eml", 0, data.length, data, "seek", 1, 5, System.currentTimeMillis());
testHarness.processElement(new StreamRecord<>(fileChunk));
Assert.assertEquals("上传文件到hos错误", 0, hosSink.sendHosErrorCounter.getCount());
Assert.assertEquals("上传文件到hos失败", 1, hosSink.sendHosCounter.getCount());
Assert.assertEquals(1, hosSink.sendHosFileCounter.getCount());
Assert.assertEquals(1, hosSink.sendHosChunkCounter.getCount());
//append模式
fileChunk = new FileChunk("0000000002", "pcapng", data.length, data, "append", 5, System.currentTimeMillis(), pcapngFileMeta, "1-200,2-200,3-200,4-200,5-200");
testHarness.processElement(new StreamRecord<>(fileChunk));
Assert.assertEquals("上传文件到hos错误", 0, hosSink.sendHosErrorCounter.getCount());
Assert.assertEquals("上传文件到hos次数错误", 2, hosSink.sendHosCounter.getCount());
Assert.assertEquals(2, hosSink.sendHosChunkCounter.getCount());
testHarness.close();
//测试批量上传
configuration.setString(Configs.SINK_TYPE, "hos");
configuration.setBoolean(Configs.SINK_BATCH, true);
configuration.setInteger(Configs.SINK_BATCH_COUNT, 2);
hosSink = new HosSink(configuration);
fileChunkStreamSink = new StreamSink<>(hosSink);
testHarness = new OneInputStreamOperatorTestHarness<>(fileChunkStreamSink);
testHarness.setup();
testHarness.open();
fileChunk = new FileChunk("0000000001", "eml", 0, data.length, data, "seek", 1, 5, System.currentTimeMillis());
testHarness.processElement(new StreamRecord<>(fileChunk));
fileChunk = new FileChunk("0000000002", "eml", 0, data.length, data, "seek", 1, 5, System.currentTimeMillis());
testHarness.processElement(new StreamRecord<>(fileChunk));
Assert.assertEquals("上传文件到hos错误", 0, hosSink.sendHosErrorCounter.getCount());
Assert.assertEquals("上传文件到hos失败", 1, hosSink.sendHosCounter.getCount());
Assert.assertEquals(2, hosSink.sendHosFileCounter.getCount());
Assert.assertEquals(2, hosSink.sendHosChunkCounter.getCount());
testHarness.close();
}
@Test
public void testHBaseSink() throws Exception {
configuration.setString(Configs.SINK_TYPE, "hbase");
configuration.setBoolean(Configs.SINK_BATCH, true);
configuration.setInteger(Configs.SINK_BATCH_COUNT, 2);
HBaseSink hBaseSink = new HBaseSink(configuration);
StreamSink<FileChunk> fileChunkStreamSink = new StreamSink<>(hBaseSink);
OneInputStreamOperatorTestHarness<FileChunk, Object> testHarness = new OneInputStreamOperatorTestHarness<>(fileChunkStreamSink);
testHarness.setup();
testHarness.open();
byte[] data = RandomUtil.randomString(1000).getBytes();
FileChunk fileChunk = new FileChunk("0000000001", "eml", 0, data.length, data, "seek", 1, 5, System.currentTimeMillis());
testHarness.processElement(new StreamRecord<>(fileChunk));
fileChunk = new FileChunk("0000000002", "eml", 0, data.length, data, "seek", 1, 5, System.currentTimeMillis());
testHarness.processElement(new StreamRecord<>(fileChunk));
Assert.assertEquals("上传文件到hbase错误", 0, hBaseSink.sendHBaseErrorCounter.getCount());
Assert.assertEquals("上传文件到hbase次数错误", 3, hBaseSink.sendHBaseCounter.getCount());
Assert.assertEquals(2, hBaseSink.sendHBaseFileCounter.getCount());
Assert.assertEquals(2, hBaseSink.sendHBaseChunkCounter.getCount());
testHarness.close();
}
@Test @Test
public void testCombineFullChunk() { public void testCombineFullChunk() {
categorizeChunks(inputFileChunks); categorizeChunks(inputFileChunks);
@@ -212,37 +386,6 @@ public class FileChunkCombinerTests {
Assert.assertEquals("监控指标错误", pcapngChunkCount * 2 - 2, appendChunkCounter.getCount()); Assert.assertEquals("监控指标错误", pcapngChunkCount * 2 - 2, appendChunkCounter.getCount());
} }
@Test
public void testSendToHos() {
byte[] data = RandomUtil.randomString(1000).getBytes();
//seek模式
FileChunk fileChunk = new FileChunk();
fileChunk.setUuid("0000000001");
fileChunk.setCombineMode("seek");
fileChunk.setFileType("eml");
fileChunk.setOffset(0);
fileChunk.setLength(data.length);
fileChunk.setLastChunkFlag(1);
fileChunk.setChunkCount(5);
fileChunk.setTimestamp(System.currentTimeMillis());
fileChunk.setChunk(data);
PublicUtil.sendToHos(fileChunk, configuration, sendHosErrorCounter);
Assert.assertEquals("上传文件到hos错误", 0, sendHosErrorCounter.getCount());
//append模式
fileChunk = new FileChunk();
fileChunk.setUuid("0000000002");
fileChunk.setCombineMode("append");
fileChunk.setFileType("pcapng");
fileChunk.setLength(data.length);
fileChunk.setChunkCount(5);
fileChunk.setTimestamp(System.currentTimeMillis());
fileChunk.setChunk(data);
fileChunk.setChunkNumbers("1-200,2-200,3-200,4-200,5-200");
fileChunk.setMeta(pcapngFileMeta);
PublicUtil.sendToHos(fileChunk, configuration, sendHosErrorCounter);
Assert.assertEquals("上传文件到hos错误", 0, sendHosErrorCounter.getCount());
}
@Test @Test
public void testPipelineFullChunk() throws Exception { public void testPipelineFullChunk() throws Exception {
CollectSink.values.clear(); CollectSink.values.clear();
@@ -444,40 +587,6 @@ public class FileChunkCombinerTests {
} }
} }
@Test
public void testCombineChunkProcessWindowFunction() throws Exception {
ListStateDescriptor listStateDescriptor = new ListStateDescriptor<FileChunk>("test-window", new ListSerializer(new JavaSerializer()));
WindowOperator<String, FileChunk, FileChunk, FileChunk, TimeWindow> operator = new WindowOperator<String, FileChunk, FileChunk, FileChunk, TimeWindow>(
TumblingProcessingTimeWindows.of(Time.seconds(3)),
new TimeWindow.Serializer(),
new FileChunkKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
listStateDescriptor,
new InternalIterableProcessWindowFunction(new CombineChunkProcessWindowFunction(configuration)),
ProcessingTimeTrigger.create(),
0L, null);
KeyedOneInputStreamOperatorTestHarness<String, FileChunk, FileChunk> testHarness = new KeyedOneInputStreamOperatorTestHarness<String, FileChunk, FileChunk>(operator, new FileChunkKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
testHarness.setProcessingTime(3L);
testHarness.processElement(new StreamRecord<>(inputFileChunks.get(0)));
testHarness.processElement(new StreamRecord<>(inputFileChunks.get(1)));
testHarness.processElement(new StreamRecord<>(inputFileChunks.get(2)));
testHarness.processElement(new StreamRecord<>(inputFileChunks.get(3)));
testHarness.processElement(new StreamRecord<>(inputFileChunks.get(4)));
testHarness.setProcessingTime(5000L);
expectedOutput.add(new StreamRecord<>(PublicUtil.combine(inputFileChunks.subList(0, 5), maxChunkCount, duplicateChunkCounter, combineErrorCounter, seekChunkCounter, appendChunkCounter).get(0), 2999L));
ConcurrentLinkedQueue<Object> actualOutput = testHarness.getOutput();
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, actualOutput, new Comparator<Object>() {
@Override
public int compare(Object o1, Object o2) {
StreamRecord sr0 = (StreamRecord) o1;
StreamRecord sr1 = (StreamRecord) o2;
return Long.compare(((FileChunk) sr0.getValue()).getOffset(), ((FileChunk) sr1.getValue()).getOffset());
}
});
}
@Test @Test
public void testMock() throws Exception { public void testMock() throws Exception {
ProcessWindowFunctionMock mock = Mockito.mock(ProcessWindowFunctionMock.class); ProcessWindowFunctionMock mock = Mockito.mock(ProcessWindowFunctionMock.class);
@@ -563,5 +672,4 @@ public class FileChunkCombinerTests {
} }
} }
} }
} }