diff --git a/pom.xml b/pom.xml
index 3ebc9b2..67c6e96 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
com.zdjizhi
file-chunk-combiner
- 1.2.0
+ 1.3.0
diff --git a/src/main/java/com/zdjizhi/FileChunkCombiner.java b/src/main/java/com/zdjizhi/FileChunkCombiner.java
index e470d42..f64b6bd 100644
--- a/src/main/java/com/zdjizhi/FileChunkCombiner.java
+++ b/src/main/java/com/zdjizhi/FileChunkCombiner.java
@@ -45,7 +45,7 @@ public class FileChunkCombiner {
.filter(new FileChunkFilterFunction(configuration.getLong(Configs.FILE_MAX_SIZE), configuration.getString(Configs.FILTER_EXPRESSION)))
.assignTimestampsAndWatermarks(watermarkStrategy);
- OutputTag delayedChunkOutputTag = new OutputTag<>("delayed-chunk") {
+ OutputTag delayedChunkOutputTag = new OutputTag("delayed-chunk") {
};
List> triggers = new ArrayList<>();
@@ -69,7 +69,7 @@ public class FileChunkCombiner {
windowStream.getSideOutput(delayedChunkOutputTag)
.map(new SideOutputMapFunction())
.addSink(new HosSink(configuration))
- .name("Hos Delayed Chunk");
+ .name("Delayed Chunk");
} else {
windowStream.addSink(new HBaseSink(configuration))
.name("HBase")
@@ -77,7 +77,7 @@ public class FileChunkCombiner {
windowStream.getSideOutput(delayedChunkOutputTag)
.map(new SideOutputMapFunction())
.addSink(new HBaseSink(configuration))
- .name("HBase Delayed Chunk");
+ .name("Delayed Chunk");
}
environment.execute(configuration.get(Configs.FLINK_JOB_NAME));
diff --git a/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java b/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java
index c94a6f9..9797b1d 100644
--- a/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java
+++ b/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java
@@ -22,8 +22,8 @@ import static com.zdjizhi.utils.PublicConstants.COMBINE_MODE_SEEK;
public class CombineChunkProcessWindowFunction extends ProcessWindowFunction {
private static final Log LOG = LogFactory.get();
- public transient Counter duplicateChunkCounter;
- public transient Counter combineErrorCounter;
+ public transient Counter duplicateChunksCounter;
+ public transient Counter combineErrorChunksCounter;
private final int fileMaxChunkCount;
public CombineChunkProcessWindowFunction(int fileMaxChunkCount) {
@@ -34,10 +34,10 @@ public class CombineChunkProcessWindowFunction extends ProcessWindowFunction combine(Iterable input) {
List combinedFileChunkList = new ArrayList<>();
+ List originalFileChunkList = StreamSupport.stream(input.spliterator(), false).collect(Collectors.toList());
try {
- List originalFileChunkList = StreamSupport.stream(input.spliterator(), false).collect(Collectors.toList());
List waitingToCombineChunkList = new ArrayList<>();
if (COMBINE_MODE_SEEK.equals(originalFileChunkList.get(0).getCombineMode())) {
// 按照offset排序
@@ -71,7 +71,7 @@ public class CombineChunkProcessWindowFunction extends ProcessWindowFunction actualOffset) {// 期望offset大于当前offset,该块为重复块,跳过该块
duplicateCount++;
- duplicateChunkCounter.inc();
+ duplicateChunksCounter.inc();
} else if (expectedOffset == actualOffset) {// 期望offset等于当前offset,将该块添加到待合并集合中
if (currentFileChunk.getLastChunkFlag() == 1) {
lastChunkFlag = currentFileChunk.getLastChunkFlag();
@@ -129,7 +129,7 @@ public class CombineChunkProcessWindowFunction extends ProcessWindowFunction {
private final long maxFileSize;
private final String filterExpression;
- public transient Counter filterChunkCounter;
+ public transient Counter filterChunksCounter;
private JexlExpression jexlExpression;
private JexlContext jexlContext;
@@ -25,8 +25,8 @@ public class FileChunkFilterFunction extends RichFilterFunction {
public void open(Configuration parameters) throws Exception {
super.open(parameters);
MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
- filterChunkCounter = metricGroup.counter("filterChunkCount");
- metricGroup.meter("filterChunkPerSecond", new MeterView(filterChunkCounter));
+ filterChunksCounter = metricGroup.counter("filterChunksCount");
+ metricGroup.meter("numChunksFilterPerSecond", new MeterView(filterChunksCounter));
JexlEngine jexlEngine = new JexlBuilder().create();
jexlExpression = jexlEngine.createExpression(filterExpression);
jexlContext = new MapContext();
@@ -35,13 +35,13 @@ public class FileChunkFilterFunction extends RichFilterFunction {
@Override
public boolean filter(FileChunk value) {
if (value == null || value.getOffset() > maxFileSize) {
- filterChunkCounter.inc();
+ filterChunksCounter.inc();
return false;
}
if (StrUtil.isNotEmpty(filterExpression)) {
jexlContext.set(value.getClass().getSimpleName(), value);
if (!Boolean.parseBoolean(jexlExpression.evaluate(jexlContext).toString())) {
- filterChunkCounter.inc();
+ filterChunksCounter.inc();
return false;
}
}
diff --git a/src/main/java/com/zdjizhi/function/ParseMessagePackMapFunction.java b/src/main/java/com/zdjizhi/function/ParseMessagePackMapFunction.java
index 4aeefef..c7add8d 100644
--- a/src/main/java/com/zdjizhi/function/ParseMessagePackMapFunction.java
+++ b/src/main/java/com/zdjizhi/function/ParseMessagePackMapFunction.java
@@ -23,9 +23,21 @@ public class ParseMessagePackMapFunction extends RichMapFunction= 1000) {
if (StrUtil.isNotEmpty(rateLimitExpression)) {
@@ -75,7 +111,7 @@ public class ParseMessagePackMapFunction extends RichMapFunction {
- private transient Counter delayedChunkCounter;
+ public transient Counter delayedChunksCounter;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
- delayedChunkCounter = metricGroup.counter("delayedChunkCount");
- metricGroup.meter("delayedChunkPerSecond", new MeterView(delayedChunkCounter));
+ delayedChunksCounter = metricGroup.counter("delayedChunksCount");
+ metricGroup.meter("numChunksDelayPerSecond", new MeterView(delayedChunksCounter));
}
@Override
public FileChunk map(FileChunk fileChunk) {
- delayedChunkCounter.inc();
+ delayedChunksCounter.inc();
fileChunk.setChunkCount(1);
if (COMBINE_MODE_APPEND.equals(fileChunk.getCombineMode())) {
fileChunk.setChunkNumbers(fileChunk.getTimestamp() + "-" + fileChunk.getChunk().length + ";");
diff --git a/src/main/java/com/zdjizhi/sink/HBaseSink.java b/src/main/java/com/zdjizhi/sink/HBaseSink.java
index 658b7d3..66ec2c6 100644
--- a/src/main/java/com/zdjizhi/sink/HBaseSink.java
+++ b/src/main/java/com/zdjizhi/sink/HBaseSink.java
@@ -29,10 +29,16 @@ public class HBaseSink extends RichSinkFunction {
private static final Log LOG = LogFactory.get();
private final Configuration configuration;
- public transient Counter sendHBaseCounter;
- public transient Counter sendHBaseErrorCounter;
- public transient Counter sendHBaseFileCounter;
- public transient Counter sendHBaseChunkCounter;
+ public transient Counter sinkRequestsCounter;
+ public transient Counter sinkErrorRequestsCounter;
+ public transient Counter sinkFilesCounter;
+ public transient Counter sinkChunksCounter;
+ public transient Counter lessThan5KBChunksCounter;
+ public transient Counter between5KBAnd10KBChunksCounter;
+ public transient Counter between10KBAnd50KBChunksCounter;
+ public transient Counter between50KBAnd100KBChunksCounter;
+ public transient Counter between100KBAnd1MBChunksCounter;
+ public transient Counter greaterThan1MBChunksCounter;
private boolean isAsync;
private Connection syncHBaseConnection;
private AsyncConnection AsyncHBaseConnection;
@@ -58,14 +64,27 @@ public class HBaseSink extends RichSinkFunction {
public void open(Configuration parameters) throws Exception {
super.open(parameters);
MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
- sendHBaseCounter = metricGroup.counter("sendHBaseCount");
- sendHBaseErrorCounter = metricGroup.counter("sendHBaseErrorCount");
- sendHBaseFileCounter = metricGroup.counter("sendHBaseFileCount");
- sendHBaseChunkCounter = metricGroup.counter("sendHBaseChunkCount");
- metricGroup.meter("sendHBasePerSecond", new MeterView(sendHBaseCounter, 5));
- metricGroup.meter("sendHBaseErrorPerSecond", new MeterView(sendHBaseErrorCounter));
- metricGroup.meter("sendHBaseFilePerSecond", new MeterView(sendHBaseFileCounter));
- metricGroup.meter("sendHBaseChunkPerSecond", new MeterView(sendHBaseChunkCounter));
+ lessThan5KBChunksCounter = metricGroup.counter("lessThan5KBChunksCount");
+ between5KBAnd10KBChunksCounter = metricGroup.counter("between5KBAnd10KBChunksCount");
+ between10KBAnd50KBChunksCounter = metricGroup.counter("between10KBAnd50KBChunksCount");
+ between50KBAnd100KBChunksCounter = metricGroup.counter("between50KBAnd100KBChunksCount");
+ between100KBAnd1MBChunksCounter = metricGroup.counter("between100KBAnd1MBChunksCount");
+ greaterThan1MBChunksCounter = metricGroup.counter("greaterThan1MBChunksCount");
+ metricGroup.meter("numLessThan5KBChunksOutPerSecond", new MeterView(lessThan5KBChunksCounter));
+ metricGroup.meter("numBetween5KBAnd10KBChunksOutPerSecond", new MeterView(between5KBAnd10KBChunksCounter));
+ metricGroup.meter("numBetween10KBAnd50KBChunksOutPerSecond", new MeterView(between10KBAnd50KBChunksCounter));
+ metricGroup.meter("numBetween50KBAnd100KBChunkPsOuterSecond", new MeterView(between50KBAnd100KBChunksCounter));
+ metricGroup.meter("numBetween100KBAnd1MBChunksOutPerSecond", new MeterView(between100KBAnd1MBChunksCounter));
+ metricGroup.meter("numGreaterThan1MBChunksOutPerSecond", new MeterView(greaterThan1MBChunksCounter));
+ sinkRequestsCounter = metricGroup.counter("sinkRequestsCount");
+ sinkErrorRequestsCounter = metricGroup.counter("sinkErrorRequestsCount");
+ sinkFilesCounter = metricGroup.counter("sinkFilesCount");
+ sinkChunksCounter = metricGroup.counter("sinkChunksCount");
+ metricGroup.meter("numRequestsSinkPerSecond", new MeterView(sinkRequestsCounter, 5));
+ metricGroup.meter("numErrorRequestsSinkPerSecond", new MeterView(sinkErrorRequestsCounter));
+ metricGroup.meter("numFilesSinkPerSecond", new MeterView(sinkFilesCounter));
+ metricGroup.meter("numChunksSinkPerSecond", new MeterView(sinkChunksCounter));
+
isAsync = configuration.getBoolean(Configs.SINK_ASYNC);
if (isAsync) {
AsyncHBaseConnection = HBaseConnectionUtil.getInstance(configuration).getAsyncHBaseConnection();
@@ -90,16 +109,17 @@ public class HBaseSink extends RichSinkFunction {
@Override
public void invoke(FileChunk fileChunk, Context context) {
if (COMBINE_MODE_SEEK.equals(fileChunk.getCombineMode()) && configuration.get(Configs.SINK_BATCH)) {
- sendHBaseChunkCounter.inc();
+ sinkChunksCounter.inc();
byte[] data = "".getBytes();
if (fileChunk.getChunk() != null) {
data = fileChunk.getChunk();
}
+ int chunkLength = data.length;
long timestamp = System.currentTimeMillis();
Map partMessageMap = new HashMap<>();
partMessageMap.put(APPEND_FILE_PART_MESSAGE_CHUNK_COUNT, fileChunk.getChunkCount() + "");
partMessageMap.put(APPEND_FILE_PART_MESSAGE_LAST_PART_FLAG, fileChunk.getLastChunkFlag() + "");
- partMessageMap.put(APPEND_FILE_PART_MESSAGE_SIZE, data.length + "");
+ partMessageMap.put(APPEND_FILE_PART_MESSAGE_SIZE, chunkLength + "");
Put dataPut = new Put(Bytes.toBytes(PublicUtil.getRowKey(fileChunk.getUuid()) + PublicConstants.FILE_DATA_ROW_SUFFIX));
dataPut.addColumn(BYTE_FAMILY_DATA, Bytes.toBytes(String.valueOf(fileChunk.getOffset())), data);
dataPut.addColumn(BYTE_FAMILY_META, Bytes.toBytes(String.valueOf(fileChunk.getOffset())), Bytes.toBytes(partMessageMap.toString()));
@@ -126,25 +146,26 @@ public class HBaseSink extends RichSinkFunction {
Put indexFilenamePut = new Put(Bytes.toBytes(indexFilenameKey));
indexFilenamePut.addColumn(HBaseColumnConstants.BYTE_FAMILY_META, HBaseColumnConstants.BYTE_COLUMN_FILENAME, Bytes.toBytes(fileChunk.getUuid()));
indexFilenamePutList.add(indexFilenamePut);
- sendHBaseFileCounter.inc();
+ sinkFilesCounter.inc();
} else {
Put metaPut = new Put(Bytes.toBytes(PublicUtil.getRowKey(fileChunk.getUuid())));
metaPut.addColumn(HBaseColumnConstants.BYTE_FAMILY_META, HBaseColumnConstants.BYTE_COLUMN_LAST_MODIFIED, Bytes.toBytes(timestamp));
dataPutList.add(metaPut);
}
chunkCount++;
- chunkSize += data.length;
+ chunkSize += chunkLength;
+ calculateChunkSize(chunkLength);
if (chunkSize >= maxBatchSize || chunkCount >= maxBatchCount) {
if (isAsync) {
if (dataPutList.size() > 0) {
List> futures = asyncTable.batch(dataPutList);
- sendHBaseCounter.inc();
+ sinkRequestsCounter.inc();
CompletableFuture.supplyAsync(() -> {
for (CompletableFuture