diff --git a/pom.xml b/pom.xml
index e49e6cf..3ebc9b2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
com.zdjizhi
file-chunk-combiner
- 1.1.0
+ 1.2.0
@@ -292,6 +292,14 @@
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ 3.1.2
+
+ true
+
+
\ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/FileChunkCombiner.java b/src/main/java/com/zdjizhi/FileChunkCombiner.java
index 5248c90..0bcbb19 100644
--- a/src/main/java/com/zdjizhi/FileChunkCombiner.java
+++ b/src/main/java/com/zdjizhi/FileChunkCombiner.java
@@ -40,12 +40,12 @@ public class FileChunkCombiner {
SingleOutputStreamOperator parseMessagePackStream = environment
.addSource(KafkaConsumer.byteArrayConsumer(configuration))
.name("Kafka Source")
- .map(new ParseMessagePackMapFunction())
+ .map(new ParseMessagePackMapFunction(configuration.get(Configs.MAP_ENABLE_RATE_LIMIT), configuration.get(Configs.MAP_RATE_LIMIT_THRESHOLD)))
.name("Map: Parse Message Pack")
.filter(new FileChunkFilterFunction(configuration.getLong(Configs.FILE_MAX_SIZE), configuration.getString(Configs.FILTER_EXPRESSION)))
.assignTimestampsAndWatermarks(watermarkStrategy);
- OutputTag delayedChunkOutputTag = new OutputTag("delayed-chunk") {
+ OutputTag delayedChunkOutputTag = new OutputTag<>("delayed-chunk") {
};
List> triggers = new ArrayList<>();
@@ -57,7 +57,7 @@ public class FileChunkCombiner {
.window(TumblingEventTimeWindows.of(Time.seconds(configuration.get(Configs.COMBINER_WINDOW_TIME))))
.trigger(trigger)
.sideOutputLateData(delayedChunkOutputTag)
- .process(new CombineChunkProcessWindowFunction(configuration))
+ .process(new CombineChunkProcessWindowFunction(configuration.get(Configs.FILE_MAX_CHUNK_COUNT)))
.name("Window: Combine Chunk")
.setParallelism(configuration.get(Configs.COMBINER_WINDOW_PARALLELISM))
.disableChaining();
diff --git a/src/main/java/com/zdjizhi/config/Configs.java b/src/main/java/com/zdjizhi/config/Configs.java
index 699f6f1..eaa7765 100644
--- a/src/main/java/com/zdjizhi/config/Configs.java
+++ b/src/main/java/com/zdjizhi/config/Configs.java
@@ -43,6 +43,13 @@ public class Configs {
.stringType()
.noDefaultValue();
+ public static final ConfigOption MAP_ENABLE_RATE_LIMIT = ConfigOptions.key("map.enable.rate.limit")
+ .booleanType()
+ .defaultValue(false);
+ public static final ConfigOption MAP_RATE_LIMIT_THRESHOLD = ConfigOptions.key("map.rate.limit.threshold")
+ .longType()
+ .defaultValue(Long.MAX_VALUE);
+
public static final ConfigOption COMBINER_WINDOW_PARALLELISM = ConfigOptions.key("combiner.window.parallelism")
.intType()
.defaultValue(1);
diff --git a/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java b/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java
index 46f9eaa..c94a6f9 100644
--- a/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java
+++ b/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java
@@ -1,27 +1,33 @@
package com.zdjizhi.function;
-import com.zdjizhi.config.Configs;
+import cn.hutool.core.util.ArrayUtil;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
import com.zdjizhi.pojo.FileChunk;
-import com.zdjizhi.utils.PublicUtil;
+import com.zdjizhi.utils.StringUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.*;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static com.zdjizhi.utils.PublicConstants.COMBINE_MODE_SEEK;
public class CombineChunkProcessWindowFunction extends ProcessWindowFunction {
+ private static final Log LOG = LogFactory.get();
- private transient Counter duplicateChunkCounter;
+ public transient Counter duplicateChunkCounter;
public transient Counter combineErrorCounter;
- public transient Counter seekChunkCounter;
- public transient Counter appendChunkCounter;
- private final Configuration configuration;
+ private final int fileMaxChunkCount;
- public CombineChunkProcessWindowFunction(Configuration configuration) {
- this.configuration = configuration;
+ public CombineChunkProcessWindowFunction(int fileMaxChunkCount) {
+ this.fileMaxChunkCount = fileMaxChunkCount;
}
@Override
@@ -30,15 +36,126 @@ public class CombineChunkProcessWindowFunction extends ProcessWindowFunction elements, Collector out) {
- List fileChunks = PublicUtil.combine(elements, configuration.get(Configs.FILE_MAX_CHUNK_COUNT), duplicateChunkCounter, combineErrorCounter, seekChunkCounter, appendChunkCounter);
+ List fileChunks = combine(elements);
for (FileChunk fileChunk : fileChunks) {
out.collect(fileChunk);
}
}
+
+ private List combine(Iterable input) {
+ List combinedFileChunkList = new ArrayList<>();
+ try {
+ List originalFileChunkList = StreamSupport.stream(input.spliterator(), false).collect(Collectors.toList());
+ List waitingToCombineChunkList = new ArrayList<>();
+ if (COMBINE_MODE_SEEK.equals(originalFileChunkList.get(0).getCombineMode())) {
+ // 按照offset排序
+ originalFileChunkList.sort(Comparator.comparingLong(FileChunk::getOffset));
+ Iterator originalFileChunkIterator = originalFileChunkList.iterator();
+ if (originalFileChunkIterator.hasNext()) {
+ int duplicateCount = 0;
+ FileChunk currentFileChunk = originalFileChunkIterator.next();
+ int lastChunkFlag = currentFileChunk.getLastChunkFlag();
+ long startOffset = currentFileChunk.getOffset();
+ if (currentFileChunk.getChunk() != null && currentFileChunk.getChunk().length > 0) {
+ waitingToCombineChunkList.add(currentFileChunk.getChunk());
+ }
+ while (originalFileChunkIterator.hasNext()) {
+ long expectedOffset = currentFileChunk.getOffset() + currentFileChunk.getLength();
+ currentFileChunk = originalFileChunkIterator.next();
+ long actualOffset = currentFileChunk.getOffset();
+ if (expectedOffset > actualOffset) {// 期望offset大于当前offset,该块为重复块,跳过该块
+ duplicateCount++;
+ duplicateChunkCounter.inc();
+ } else if (expectedOffset == actualOffset) {// 期望offset等于当前offset,将该块添加到待合并集合中
+ if (currentFileChunk.getLastChunkFlag() == 1) {
+ lastChunkFlag = currentFileChunk.getLastChunkFlag();
+ }
+ if (currentFileChunk.getChunk() != null && currentFileChunk.getChunk().length > 0) {
+ waitingToCombineChunkList.add(currentFileChunk.getChunk());
+ }
+ } else {// 期望offset小于当前offset,说明缺块
+ if (waitingToCombineChunkList.size() > 0) {//将可合并的chunk合并,清空集合
+ combinedFileChunkList.add(combineChunk(waitingToCombineChunkList, currentFileChunk.getUuid(), currentFileChunk.getFileName(), currentFileChunk.getFileType(), startOffset, currentFileChunk.getCombineMode(), lastChunkFlag, originalFileChunkList.get(0).getMeta(), 0, null));
+ waitingToCombineChunkList.clear();
+ } else {
+ if (lastChunkFlag == 1) {
+ combinedFileChunkList.add(currentFileChunk);
+ }
+ }
+ // 将当前块作为第一个块,继续合并
+ startOffset = currentFileChunk.getOffset();// 重置起始offset
+ lastChunkFlag = currentFileChunk.getLastChunkFlag();
+ if (currentFileChunk.getChunk() != null && currentFileChunk.getChunk().length > 0) {
+ waitingToCombineChunkList.add(currentFileChunk.getChunk());
+ }
+ }
+ }
+ if (waitingToCombineChunkList.size() > 0) {
+ combinedFileChunkList.add(combineChunk(waitingToCombineChunkList, currentFileChunk.getUuid(), currentFileChunk.getFileName(), currentFileChunk.getFileType(), startOffset, currentFileChunk.getCombineMode(), lastChunkFlag, originalFileChunkList.get(0).getMeta(), 0, null));
+ } else {
+ if (lastChunkFlag == 1) {
+ combinedFileChunkList.add(currentFileChunk);
+ }
+ }
+ if (duplicateCount > 0) {
+ LOG.error("Duplicate upload chunk, uuid=" + currentFileChunk.getUuid() + ", repetitionCount=" + duplicateCount);
+ }
+ }
+ } else {
+ // 按timestamp排序
+ originalFileChunkList.sort(Comparator.comparingLong(FileChunk::getTimestamp));
+ long startTimestamp = originalFileChunkList.get(0).getTimestamp();
+ StringBuilder timestampAndSizes = new StringBuilder();
+ for (FileChunk originalFileChunk : originalFileChunkList) {
+ byte[] chunk = originalFileChunk.getChunk();
+ if (chunk != null && chunk.length > 0) {
+ chunk = originalFileChunk.getChunk();
+ waitingToCombineChunkList.add(chunk);
+ timestampAndSizes.append(originalFileChunk.getTimestamp()).append("-").append(chunk.length).append(";");
+ }
+ if (waitingToCombineChunkList.size() > fileMaxChunkCount) {
+ break;
+ }
+ }
+ if (waitingToCombineChunkList.size() > 0) {
+ combinedFileChunkList.add(combineChunk(waitingToCombineChunkList, originalFileChunkList.get(0).getUuid(), originalFileChunkList.get(0).getFileName(), originalFileChunkList.get(0).getFileType(), 0, "append", 0, originalFileChunkList.get(0).getMeta(), startTimestamp, timestampAndSizes.toString()));
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Combiner chunk error.", e);
+ combineErrorCounter.inc();
+ }
+ return combinedFileChunkList;
+ }
+
+ private FileChunk combineChunk(List byteList, String uuid, String fileName, String fileType, long offset, String combineMode, int lastChunkFlag, Map metaMap, long startTimestamp, String chunkNumbers) {
+ FileChunk fileChunk = new FileChunk();
+ fileChunk.setChunkCount(byteList.size());
+ byte[][] bytes = new byte[byteList.size()][];
+ byteList.toArray(bytes);
+ byte[] newData = ArrayUtil.addAll(bytes);
+ if (COMBINE_MODE_SEEK.equals(combineMode)) {
+ fileChunk.setOffset(offset);
+ fileChunk.setLastChunkFlag(lastChunkFlag);
+ } else {
+ if (StringUtil.isNotEmpty(chunkNumbers)) {
+ fileChunk.setChunkNumbers(chunkNumbers);
+ }
+ }
+ fileChunk.setTimestamp(startTimestamp);
+ fileChunk.setFileType(fileType);
+ fileChunk.setUuid(uuid);
+ fileChunk.setChunk(newData);
+ fileChunk.setFileName(fileName);
+ fileChunk.setCombineMode(combineMode);
+ fileChunk.setLength(newData.length);
+ fileChunk.setMeta(metaMap);
+ return fileChunk;
+ }
}
diff --git a/src/main/java/com/zdjizhi/function/FileChunkFilterFunction.java b/src/main/java/com/zdjizhi/function/FileChunkFilterFunction.java
index 6137194..663306e 100644
--- a/src/main/java/com/zdjizhi/function/FileChunkFilterFunction.java
+++ b/src/main/java/com/zdjizhi/function/FileChunkFilterFunction.java
@@ -6,12 +6,13 @@ import org.apache.commons.jexl3.*;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.MetricGroup;
public class FileChunkFilterFunction extends RichFilterFunction {
private final long maxFileSize;
private final String filterExpression;
- private transient Counter filterChunkCounter;
+ public transient Counter filterChunkCounter;
private JexlExpression jexlExpression;
private JexlContext jexlContext;
@@ -25,6 +26,7 @@ public class FileChunkFilterFunction extends RichFilterFunction {
super.open(parameters);
MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
filterChunkCounter = metricGroup.counter("filterChunkCount");
+ metricGroup.meter("filterChunkPerSecond", new MeterView(filterChunkCounter));
JexlEngine jexlEngine = new JexlBuilder().create();
jexlExpression = jexlEngine.createExpression(filterExpression);
jexlContext = new MapContext();
diff --git a/src/main/java/com/zdjizhi/function/ParseMessagePackMapFunction.java b/src/main/java/com/zdjizhi/function/ParseMessagePackMapFunction.java
index 3e0fc34..2535421 100644
--- a/src/main/java/com/zdjizhi/function/ParseMessagePackMapFunction.java
+++ b/src/main/java/com/zdjizhi/function/ParseMessagePackMapFunction.java
@@ -6,17 +6,68 @@ import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.pojo.FileChunk;
import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.*;
import org.msgpack.core.MessagePack;
import org.msgpack.core.MessageUnpacker;
import java.util.HashMap;
import java.util.Map;
+import static com.zdjizhi.utils.PublicConstants.COMBINE_MODE_APPEND;
+
public class ParseMessagePackMapFunction extends RichMapFunction {
private static final Log LOG = LogFactory.get();
+ public transient Counter parseMessagePackCounter;
+ public transient Counter parseMessagePackErrorCounter;
+ public transient Counter rateLimitDropCounter;
+ private final boolean enableRateLimit;
+ private final long rateLimitThreshold;
+ private long timestamp;
+ private long count;
+
+ public ParseMessagePackMapFunction(boolean enableRateLimit, long rateLimitThreshold) {
+ this.rateLimitThreshold = rateLimitThreshold;
+ this.enableRateLimit = enableRateLimit;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
+ parseMessagePackCounter = metricGroup.counter("parseMessagePackCount");
+ parseMessagePackErrorCounter = metricGroup.counter("parseMessagePackErrorCount");
+ rateLimitDropCounter = metricGroup.counter("rateLimitDropCount");
+ metricGroup.meter("parseMessagePackPerSecond", new MeterView(parseMessagePackCounter));
+ metricGroup.meter("parseMessagePackErrorPerSecond", new MeterView(parseMessagePackErrorCounter));
+ metricGroup.meter("rateLimitDropPerSecond", new MeterView(rateLimitDropCounter));
+ timestamp = System.currentTimeMillis();
+ count = 0;
+ }
+
@Override
public FileChunk map(byte[] messagePackData) {
+ FileChunk fileChunk = null;
+ if (enableRateLimit) {
+ count++;
+ if (System.currentTimeMillis() - timestamp < 1000 && count <= rateLimitThreshold) {
+ fileChunk = parseMessagePack(messagePackData);
+ } else if (System.currentTimeMillis() - timestamp < 1000 && count > rateLimitThreshold) {
+ rateLimitDropCounter.inc();
+ } else {
+ rateLimitDropCounter.inc();
+ timestamp = System.currentTimeMillis();
+ count = 0;
+ }
+ } else {
+ fileChunk = parseMessagePack(messagePackData);
+ }
+ return fileChunk;
+ }
+
+ private FileChunk parseMessagePack(byte[] messagePackData) {
+ parseMessagePackCounter.inc();
FileChunk fileChunk;
try {
fileChunk = new FileChunk();
@@ -66,10 +117,11 @@ public class ParseMessagePackMapFunction extends RichMapFunction {
- private transient Counter pcapDelayedChunkCounter;
- private transient Counter trafficDelayedChunkCounter;
+ private transient Counter delayedChunkCounter;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
- pcapDelayedChunkCounter = metricGroup.counter("pcapDelayedChunkCount");
- trafficDelayedChunkCounter = metricGroup.counter("trafficDelayedChunkCount");
+ delayedChunkCounter = metricGroup.counter("delayedChunkCount");
+ metricGroup.meter("delayedChunkPerSecond", new MeterView(delayedChunkCounter));
}
@Override
public FileChunk map(FileChunk fileChunk) {
+ delayedChunkCounter.inc();
fileChunk.setChunkCount(1);
- if (COMBINE_MODE_SEEK.equals(fileChunk.getCombineMode())) {
- trafficDelayedChunkCounter.inc();
- } else {
+ if (COMBINE_MODE_APPEND.equals(fileChunk.getCombineMode())) {
fileChunk.setChunkNumbers(fileChunk.getTimestamp() + "-" + fileChunk.getChunk().length + ";");
- pcapDelayedChunkCounter.inc();
}
return fileChunk;
}
diff --git a/src/main/java/com/zdjizhi/pojo/FileChunk.java b/src/main/java/com/zdjizhi/pojo/FileChunk.java
index 4fa14d9..c2282aa 100644
--- a/src/main/java/com/zdjizhi/pojo/FileChunk.java
+++ b/src/main/java/com/zdjizhi/pojo/FileChunk.java
@@ -167,16 +167,19 @@ public class FileChunk implements Serializable {
length == fileChunk.length &&
lastChunkFlag == fileChunk.lastChunkFlag &&
chunkCount == fileChunk.chunkCount &&
+ timestamp == fileChunk.timestamp &&
Objects.equals(uuid, fileChunk.uuid) &&
Objects.equals(fileName, fileChunk.fileName) &&
Objects.equals(fileType, fileChunk.fileType) &&
Arrays.equals(chunk, fileChunk.chunk) &&
- Objects.equals(combineMode, fileChunk.combineMode);
+ Objects.equals(combineMode, fileChunk.combineMode) &&
+ Objects.equals(meta, fileChunk.meta) &&
+ Objects.equals(chunkNumbers, fileChunk.chunkNumbers);
}
@Override
public int hashCode() {
- int result = Objects.hash(uuid, fileName, fileType, offset, length, combineMode, lastChunkFlag, chunkCount);
+ int result = Objects.hash(uuid, fileName, fileType, offset, length, combineMode, lastChunkFlag, chunkCount, timestamp, meta, chunkNumbers);
result = 31 * result + Arrays.hashCode(chunk);
return result;
}
diff --git a/src/main/java/com/zdjizhi/sink/HBaseSink.java b/src/main/java/com/zdjizhi/sink/HBaseSink.java
index bf6a494..658b7d3 100644
--- a/src/main/java/com/zdjizhi/sink/HBaseSink.java
+++ b/src/main/java/com/zdjizhi/sink/HBaseSink.java
@@ -11,6 +11,7 @@ import com.zdjizhi.utils.PublicConstants;
import com.zdjizhi.utils.PublicUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.hbase.TableName;
@@ -61,6 +62,10 @@ public class HBaseSink extends RichSinkFunction {
sendHBaseErrorCounter = metricGroup.counter("sendHBaseErrorCount");
sendHBaseFileCounter = metricGroup.counter("sendHBaseFileCount");
sendHBaseChunkCounter = metricGroup.counter("sendHBaseChunkCount");
+ metricGroup.meter("sendHBasePerSecond", new MeterView(sendHBaseCounter, 5));
+ metricGroup.meter("sendHBaseErrorPerSecond", new MeterView(sendHBaseErrorCounter));
+ metricGroup.meter("sendHBaseFilePerSecond", new MeterView(sendHBaseFileCounter));
+ metricGroup.meter("sendHBaseChunkPerSecond", new MeterView(sendHBaseChunkCounter));
isAsync = configuration.getBoolean(Configs.SINK_ASYNC);
if (isAsync) {
AsyncHBaseConnection = HBaseConnectionUtil.getInstance(configuration).getAsyncHBaseConnection();
diff --git a/src/main/java/com/zdjizhi/sink/HosSink.java b/src/main/java/com/zdjizhi/sink/HosSink.java
index 03e5f46..c486529 100644
--- a/src/main/java/com/zdjizhi/sink/HosSink.java
+++ b/src/main/java/com/zdjizhi/sink/HosSink.java
@@ -11,6 +11,7 @@ import com.zdjizhi.utils.PublicUtil;
import org.apache.commons.lang.CharEncoding;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.http.HttpResponse;
@@ -69,6 +70,10 @@ public class HosSink extends RichSinkFunction {
sendHosErrorCounter = metricGroup.counter("sendHosErrorCount");
sendHosFileCounter = metricGroup.counter("sendHosFileCount");
sendHosChunkCounter = metricGroup.counter("sendHosChunkCount");
+ metricGroup.meter("sendHosPerSecond", new MeterView(sendHosCounter, 5));
+ metricGroup.meter("sendHosErrorPerSecond", new MeterView(sendHosErrorCounter));
+ metricGroup.meter("sendHosFilePerSecond", new MeterView(sendHosFileCounter));
+ metricGroup.meter("sendHosChunkPerSecond", new MeterView(sendHosChunkCounter));
loadBalanceMode = configuration.getInteger(Configs.SINK_HOS_LOAD_BALANCE_MODE);
if (loadBalanceMode == 0) {
endpoint = configuration.getString(Configs.SINK_HOS_ENDPOINT);
diff --git a/src/main/java/com/zdjizhi/utils/PublicUtil.java b/src/main/java/com/zdjizhi/utils/PublicUtil.java
index 02f35b8..ef027bc 100644
--- a/src/main/java/com/zdjizhi/utils/PublicUtil.java
+++ b/src/main/java/com/zdjizhi/utils/PublicUtil.java
@@ -1,134 +1,10 @@
package com.zdjizhi.utils;
-import cn.hutool.core.util.ArrayUtil;
import cn.hutool.crypto.digest.DigestUtil;
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.zdjizhi.pojo.FileChunk;
-import org.apache.flink.metrics.Counter;
import java.util.*;
-import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
-
-import static com.zdjizhi.utils.PublicConstants.COMBINE_MODE_SEEK;
public class PublicUtil {
- private static final Log LOG = LogFactory.get();
-
- public static List combine(Iterable input, long keyMaxChunk, Counter duplicateChunkCounter, Counter combineErrorCounter, Counter seekChunkCounter, Counter appendChunkCounter) {
- List combinedFileChunkList = new ArrayList<>();
- try {
- List originalFileChunkList = StreamSupport.stream(input.spliterator(), false).collect(Collectors.toList());
- List waitingToCombineChunkList = new ArrayList<>();
- if (COMBINE_MODE_SEEK.equals(originalFileChunkList.get(0).getCombineMode())) {
- seekChunkCounter.inc();
- // 按照offset排序
- originalFileChunkList.sort(Comparator.comparingLong(FileChunk::getOffset));
- Iterator originalFileChunkIterator = originalFileChunkList.iterator();
- if (originalFileChunkIterator.hasNext()) {
- int duplicateCount = 0;
- FileChunk currentFileChunk = originalFileChunkIterator.next();
- int lastChunkFlag = currentFileChunk.getLastChunkFlag();
- long startOffset = currentFileChunk.getOffset();
- if (currentFileChunk.getChunk() != null && currentFileChunk.getChunk().length > 0) {
- waitingToCombineChunkList.add(currentFileChunk.getChunk());
- }
- while (originalFileChunkIterator.hasNext()) {
- seekChunkCounter.inc();
- long expectedOffset = currentFileChunk.getOffset() + currentFileChunk.getLength();
- currentFileChunk = originalFileChunkIterator.next();
- long actualOffset = currentFileChunk.getOffset();
- if (expectedOffset > actualOffset) {// 期望offset大于当前offset,该块为重复块,跳过该块
- duplicateCount++;
- } else if (expectedOffset == actualOffset) {// 期望offset等于当前offset,将该块添加到待合并集合中
- if (currentFileChunk.getLastChunkFlag() == 1) {
- lastChunkFlag = currentFileChunk.getLastChunkFlag();
- }
- if (currentFileChunk.getChunk() != null && currentFileChunk.getChunk().length > 0) {
- waitingToCombineChunkList.add(currentFileChunk.getChunk());
- }
- } else {// 期望offset小于当前offset,说明缺块
- if (waitingToCombineChunkList.size() > 0) {//将可合并的chunk合并,清空集合
- combinedFileChunkList.add(combineChunk(waitingToCombineChunkList, currentFileChunk.getUuid(), currentFileChunk.getFileName(), currentFileChunk.getFileType(), startOffset, currentFileChunk.getCombineMode(), lastChunkFlag, originalFileChunkList.get(0).getMeta(), 0, null));
- waitingToCombineChunkList.clear();
- } else {
- if (lastChunkFlag == 1) {
- combinedFileChunkList.add(currentFileChunk);
- }
- }
- // 将当前块作为第一个块,继续合并
- startOffset = currentFileChunk.getOffset();// 重置起始offset
- lastChunkFlag = currentFileChunk.getLastChunkFlag();
- if (currentFileChunk.getChunk() != null && currentFileChunk.getChunk().length > 0) {
- waitingToCombineChunkList.add(currentFileChunk.getChunk());
- }
- }
- }
- if (waitingToCombineChunkList.size() > 0) {
- combinedFileChunkList.add(combineChunk(waitingToCombineChunkList, currentFileChunk.getUuid(), currentFileChunk.getFileName(), currentFileChunk.getFileType(), startOffset, currentFileChunk.getCombineMode(), lastChunkFlag, originalFileChunkList.get(0).getMeta(), 0, null));
- } else {
- if (lastChunkFlag == 1) {
- combinedFileChunkList.add(currentFileChunk);
- }
- }
- if (duplicateCount > 0) {
- LOG.error("Duplicate upload chunk, uuid=" + currentFileChunk.getUuid() + ", repetitionCount=" + duplicateCount);
- duplicateChunkCounter.inc(duplicateCount);
- }
- }
- } else {
- // 按timestamp排序
- originalFileChunkList.sort(Comparator.comparingLong(FileChunk::getTimestamp));
- long startTimestamp = originalFileChunkList.get(0).getTimestamp();
- StringBuilder timestampAndSizes = new StringBuilder();
- for (FileChunk originalFileChunk : originalFileChunkList) {
- appendChunkCounter.inc();
- byte[] chunk = originalFileChunk.getChunk();
- if (chunk != null && chunk.length > 0) {
- chunk = originalFileChunk.getChunk();
- waitingToCombineChunkList.add(chunk);
- timestampAndSizes.append(originalFileChunk.getTimestamp()).append("-").append(chunk.length).append(";");
- }
- if (waitingToCombineChunkList.size() > keyMaxChunk) {
- break;
- }
- }
- if (waitingToCombineChunkList.size() > 0) {
- combinedFileChunkList.add(combineChunk(waitingToCombineChunkList, originalFileChunkList.get(0).getUuid(), originalFileChunkList.get(0).getFileName(), originalFileChunkList.get(0).getFileType(), 0, "append", 0, originalFileChunkList.get(0).getMeta(), startTimestamp, timestampAndSizes.toString()));
- }
- }
- } catch (Exception e) {
- LOG.error("Combiner file error.", e);
- combineErrorCounter.inc();
- }
- return combinedFileChunkList;
- }
-
- private static FileChunk combineChunk(List byteList, String uuid, String fileName, String fileType, long offset, String combineMode, int lastChunkFlag, Map metaMap, long startTimestamp, String chunkNumbers) {
- FileChunk fileChunk = new FileChunk();
- fileChunk.setChunkCount(byteList.size());
- byte[][] bytes = new byte[byteList.size()][];
- byteList.toArray(bytes);
- byte[] newData = ArrayUtil.addAll(bytes);
- if (COMBINE_MODE_SEEK.equals(combineMode)) {
- fileChunk.setOffset(offset);
- fileChunk.setLastChunkFlag(lastChunkFlag);
- } else {
- if (StringUtil.isNotEmpty(chunkNumbers)) {
- fileChunk.setChunkNumbers(chunkNumbers);
- }
- }
- fileChunk.setTimestamp(startTimestamp);
- fileChunk.setFileType(fileType);
- fileChunk.setUuid(uuid);
- fileChunk.setChunk(newData);
- fileChunk.setFileName(fileName);
- fileChunk.setCombineMode(combineMode);
- fileChunk.setLength(newData.length);
- fileChunk.setMeta(metaMap);
- return fileChunk;
- }
public static String getUUID() {
return UUID.randomUUID().toString().replace("-", "").toLowerCase();
diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties
index 55031b1..51fee46 100644
--- a/src/main/resources/common.properties
+++ b/src/main/resources/common.properties
@@ -18,6 +18,8 @@ source.kafka.user=admin
source.kafka.pin=galaxy2019
#SSLҪ
source.kafka.tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\
+map.enable.rate.limit=false
+map.rate.limit.threshold=10000
#
combiner.window.parallelism=2
combiner.window.time=10
@@ -31,14 +33,14 @@ file.max.size=1073741824
sink.parallelism=2
sink.type=hos
sink.async=false
-sink.batch=true
+sink.batch=false
sink.batch.count=100
sink.batch.size=102400
#hos sink
#0nginx1ѯhosĬ0
sink.hos.load.balance.mode=1
#nginxhosΪip:portʶhosΪip1,ip2:port1,port2
-sink.hos.endpoint=192.168.40.151,192.168.40.152,192.168.40.203:8186
+sink.hos.endpoint=192.168.41.29:8186
sink.hos.bucket=traffic_file_bucket
sink.hos.token=c21f969b5f03d33d43e04f8f136e7682
sink.hos.http.error.retry=3
diff --git a/src/test/java/com/zdjizhi/FileChunkCombinerTests.java b/src/test/java/com/zdjizhi/FileChunkCombinerTests.java
index 26f5c7b..28cc429 100644
--- a/src/test/java/com/zdjizhi/FileChunkCombinerTests.java
+++ b/src/test/java/com/zdjizhi/FileChunkCombinerTests.java
@@ -4,36 +4,29 @@ import cn.hutool.core.io.FileUtil;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.RandomUtil;
import com.zdjizhi.config.Configs;
-import com.zdjizhi.function.CombineChunkProcessWindowFunction;
-import com.zdjizhi.function.FileChunkFilterFunction;
-import com.zdjizhi.function.FileChunkKeySelector;
-import com.zdjizhi.function.ParseMessagePackMapFunction;
+import com.zdjizhi.function.*;
import com.zdjizhi.pojo.FileChunk;
import com.zdjizhi.sink.HBaseSink;
import com.zdjizhi.sink.HosSink;
import com.zdjizhi.trigger.LastChunkOrNoDataInTimeTrigger;
import com.zdjizhi.trigger.MultipleTrigger;
-import com.zdjizhi.utils.PublicUtil;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.SimpleCounter;
-import org.apache.flink.runtime.state.JavaSerializer;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.operators.*;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
@@ -49,6 +42,7 @@ import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.streaming.util.functions.StreamingFunctionUtils;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
import org.junit.*;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
@@ -60,38 +54,28 @@ import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
public class FileChunkCombinerTests {
- private static Counter duplicateChunkCounter;
- private static Counter combineErrorCounter;
- private static Counter seekChunkCounter;
- private static Counter appendChunkCounter;
private File emlFile;
private byte[] emlFileBytes;
private byte[] pcapngFileBytes;
private List inputFileChunks;
+ private List inputFiles;
private List messagePackList;
private List emlFileChunks;
private List pcapngFileChunks;
private List pcapngIncludeMetaFileChunks;
private Map pcapngFileMeta;
- private String emlUuid = "1111111111";
- private String pcapngUuid = "2222222222";
- private String pcapngIncludeMetaUuid = "3333333333";
private int emlChunkCount = 10;
private int pcapngChunkCount = 10;
- private long maxChunkCount;
private String pcapChunkData = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
private static Configuration configuration;
+ private CombineChunkProcessWindowFunction processWindowFunction;
+ private OutputTag delayedChunkOutputTag;
+ private KeyedOneInputStreamOperatorTestHarness testHarness;
@Before
public void testBefore() throws Exception {
- String path = FileChunkCombinerTests.class.getClassLoader().getResource("common.properties").getPath();
- ParameterTool parameterTool = ParameterTool.fromPropertiesFile(path);
+ ParameterTool parameterTool = ParameterTool.fromPropertiesFile(FileChunkCombinerTests.class.getClassLoader().getResource("common.properties").getPath());
configuration = parameterTool.getConfiguration();
- duplicateChunkCounter = new SimpleCounter();
- combineErrorCounter = new SimpleCounter();
- seekChunkCounter = new SimpleCounter();
- appendChunkCounter = new SimpleCounter();
- maxChunkCount = configuration.get(Configs.FILE_MAX_CHUNK_COUNT);
String filePath = "src" + File.separator + "test" + File.separator + "data" + File.separator + "test.eml";
emlFile = new File(filePath);
emlFileBytes = FileUtil.readBytes(emlFile);
@@ -104,38 +88,68 @@ public class FileChunkCombinerTests {
pcapngFileMeta.put("ruleId", 151);
pcapngFileMeta.put("taskId", 7477);
pcapngFileMeta.put("sledIP", "127.0.0.1");
- inputFileChunks = new ArrayList<>();
emlFileChunks = new ArrayList<>();
pcapngFileChunks = new ArrayList<>();
pcapngIncludeMetaFileChunks = new ArrayList<>();
- ParseMessagePackMapFunction mapFunction = new ParseMessagePackMapFunction();
- ObjectInputStream inputStream = new ObjectInputStream(new FileInputStream("src" + File.separator + "test" + File.separator + "data" + File.separator + "messagePacks"));
- messagePackList = (List) inputStream.readObject();
- for (byte[] messagePack : messagePackList) {
- FileChunk fileChunk = mapFunction.map(messagePack);
- inputFileChunks.add(fileChunk);
- }
+ ObjectInputStream messagePacksInputStream = new ObjectInputStream(new FileInputStream("src" + File.separator + "test" + File.separator + "data" + File.separator + "messagePacks"));
+ messagePackList = (List) messagePacksInputStream.readObject();
+ messagePacksInputStream.close();
+ ObjectInputStream fileChunksInputStream = new ObjectInputStream(new FileInputStream("src" + File.separator + "test" + File.separator + "data" + File.separator + "fileChunks"));
+ inputFileChunks = (List) fileChunksInputStream.readObject();
+ fileChunksInputStream.close();
+ ObjectInputStream filesInputStream = new ObjectInputStream(new FileInputStream("src" + File.separator + "test" + File.separator + "data" + File.separator + "files"));
+ inputFiles = (List) filesInputStream.readObject();
+ filesInputStream.close();
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ List> triggers = new ArrayList<>();
+ triggers.add(EventTimeTrigger.create());
+ triggers.add(LastChunkOrNoDataInTimeTrigger.of(1000));
+ Trigger