diff --git a/pom.xml b/pom.xml
index aeefdca..11667bf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
com.zdjizhi
file-chunk-combiner
- 1.3.0
+ 1.3.1
diff --git a/src/main/java/com/zdjizhi/FileChunkCombiner.java b/src/main/java/com/zdjizhi/FileChunkCombiner.java
index 705f673..f60fefe 100644
--- a/src/main/java/com/zdjizhi/FileChunkCombiner.java
+++ b/src/main/java/com/zdjizhi/FileChunkCombiner.java
@@ -17,7 +17,6 @@ import com.zdjizhi.trigger.LastChunkTrigger;
import com.zdjizhi.trigger.MultipleTrigger;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.*;
@@ -174,35 +173,6 @@ public class FileChunkCombiner {
.filter(new FileChunkFilterFunction(configuration.getString(Configs.FILE_META_FILTER_EXPRESSION), "map_parse_proxy_file_meta"))
.name("Filter: Map")
.setParallelism(configuration.get(Configs.MAP_PARSE_FILE_META_PARALLELISM));
- windowStream
- .filter(new FileChunkFilterFunction(configuration.getString(Configs.SINK_OSS_FILTER_EXPRESSION), "sink_oss"))
- .name("Filter: Oss")
- .setParallelism(configuration.get(Configs.COMBINER_WINDOW_PARALLELISM))
- .union(fileMetaSessionSingleOutputStreamOperator, fileMetaProxySingleOutputStreamOperator)
- .keyBy(new FileChunkKeySelector())
- .addSink(new OssSinkByEhcache(configuration))
- .name("Oss")
- .setParallelism(configuration.get(Configs.SINK_PARALLELISM));
- break;
- case "oss-caffeine":
- fileMetaSessionSingleOutputStreamOperator = environment.addSource(FileMetaKafkaConsumer.stringConsumer(configuration, configuration.get(Configs.KAFKA_FILE_META_SESSION_TOPIC)))
- .setParallelism(configuration.get(Configs.MAP_PARSE_FILE_META_PARALLELISM))
- .name(configuration.get(Configs.KAFKA_FILE_META_SESSION_TOPIC))
- .flatMap(new ParseSessionFileMetaFlatMapFunction())
- .setParallelism(configuration.get(Configs.MAP_PARSE_FILE_META_PARALLELISM))
- .name("Map: Parse Session File Meta")
- .filter(new FileChunkFilterFunction(configuration.getString(Configs.FILE_META_FILTER_EXPRESSION), "map_parse_session_file_meta"))
- .name("Filter: Map")
- .setParallelism(configuration.get(Configs.MAP_PARSE_FILE_META_PARALLELISM));
- fileMetaProxySingleOutputStreamOperator = environment.addSource(FileMetaKafkaConsumer.stringConsumer(configuration, configuration.get(Configs.KAFKA_FILE_META_PROXY_TOPIC)))
- .setParallelism(configuration.get(Configs.MAP_PARSE_FILE_META_PARALLELISM))
- .name(configuration.get(Configs.KAFKA_FILE_META_PROXY_TOPIC))
- .flatMap(new ParseProxyFileMetaFlatMapFunction())
- .setParallelism(configuration.get(Configs.MAP_PARSE_FILE_META_PARALLELISM))
- .name("Map: Parse Proxy File Meta")
- .filter(new FileChunkFilterFunction(configuration.getString(Configs.FILE_META_FILTER_EXPRESSION), "map_parse_proxy_file_meta"))
- .name("Filter: Map")
- .setParallelism(configuration.get(Configs.MAP_PARSE_FILE_META_PARALLELISM));
windowStream
.filter(new FileChunkFilterFunction(configuration.getString(Configs.SINK_OSS_FILTER_EXPRESSION), "sink_oss"))
.name("Filter: Oss")
@@ -213,38 +183,6 @@ public class FileChunkCombiner {
.name("Oss")
.setParallelism(configuration.get(Configs.SINK_PARALLELISM));
break;
- case "test":
- fileMetaSessionSingleOutputStreamOperator = environment.addSource(FileMetaKafkaConsumer.stringConsumer(configuration, configuration.get(Configs.KAFKA_FILE_META_SESSION_TOPIC)))
- .setParallelism(configuration.get(Configs.MAP_PARSE_FILE_META_PARALLELISM))
- .name(configuration.get(Configs.KAFKA_FILE_META_SESSION_TOPIC))
- .flatMap(new ParseSessionFileMetaFlatMapFunction())
- .setParallelism(configuration.get(Configs.MAP_PARSE_FILE_META_PARALLELISM))
- .name("Map: Parse Session File Meta")
- .filter(new FileChunkFilterFunction(configuration.getString(Configs.FILE_META_FILTER_EXPRESSION), "map_parse_session_file_meta"))
- .name("Filter: Map")
- .setParallelism(configuration.get(Configs.MAP_PARSE_FILE_META_PARALLELISM));
- fileMetaProxySingleOutputStreamOperator = environment.addSource(FileMetaKafkaConsumer.stringConsumer(configuration, configuration.get(Configs.KAFKA_FILE_META_PROXY_TOPIC)))
- .setParallelism(configuration.get(Configs.MAP_PARSE_FILE_META_PARALLELISM))
- .name(configuration.get(Configs.KAFKA_FILE_META_PROXY_TOPIC))
- .flatMap(new ParseProxyFileMetaFlatMapFunction())
- .setParallelism(configuration.get(Configs.MAP_PARSE_FILE_META_PARALLELISM))
- .name("Map: Parse Proxy File Meta")
- .filter(new FileChunkFilterFunction(configuration.getString(Configs.FILE_META_FILTER_EXPRESSION), "map_parse_proxy_file_meta"))
- .name("Filter: Map")
- .setParallelism(configuration.get(Configs.MAP_PARSE_FILE_META_PARALLELISM));
- KeyedStream fileMetaStringKeyedStream = fileMetaSessionSingleOutputStreamOperator
- .union(fileMetaProxySingleOutputStreamOperator)
- .keyBy((KeySelector) FileChunk::getUuid);
- windowStream
- .filter(new FileChunkFilterFunction(configuration.getString(Configs.SINK_OSS_FILTER_EXPRESSION), "sink_oss"))
- .name("Filter: Oss")
- .setParallelism(configuration.get(Configs.COMBINER_WINDOW_PARALLELISM))
- .keyBy((KeySelector) FileChunk::getUuid)
- .connect(fileMetaStringKeyedStream)
- .process(new TestKeyedCoProcessFunction(configuration))
- .setParallelism(configuration.get(Configs.COMBINER_WINDOW_PARALLELISM))
- .name("Oss");
- break;
}
}
environment.execute(configuration.get(Configs.FLINK_JOB_NAME));
diff --git a/src/main/java/com/zdjizhi/config/Configs.java b/src/main/java/com/zdjizhi/config/Configs.java
index 0f642bf..383d0c0 100644
--- a/src/main/java/com/zdjizhi/config/Configs.java
+++ b/src/main/java/com/zdjizhi/config/Configs.java
@@ -84,6 +84,9 @@ public class Configs {
public static final ConfigOption SINK_BATCH_SIZE = ConfigOptions.key("sink.batch.size")
.longType()
.defaultValue(Long.MAX_VALUE);
+ public static final ConfigOption SINK_BATCH_TIME = ConfigOptions.key("sink.batch.time")
+ .intType()
+ .defaultValue(5);
public static final ConfigOption SINK_FILTER_EXPRESSION = ConfigOptions.key("sink.filter.expression")
.stringType()
.defaultValue("");
diff --git a/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java b/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java
index cd5b0df..fd1ca2b 100644
--- a/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java
+++ b/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java
@@ -120,8 +120,8 @@ public class CombineChunkProcessWindowFunction extends ProcessWindowFunction 0) {//将可合并的chunk合并,清空集合
- FileChunk fileChunk = combineChunk(waitingToCombineChunkList, currentFileChunk.getUuid(), currentFileChunk.getFileName(), currentFileChunk.getFileType(), startOffset, currentFileChunk.getCombineMode(), lastChunkFlag, originalFileChunkList.get(0).getMeta(), originalFileChunkList.get(0).getTimestamp(), null);
+ if (!waitingToCombineChunkList.isEmpty()) {//将可合并的chunk合并,清空集合
+ FileChunk fileChunk = combineChunk(waitingToCombineChunkList, currentFileChunk.getUuid(), currentFileChunk.getFileName(), currentFileChunk.getFileType(), startOffset, currentFileChunk.getCombineMode(), lastChunkFlag, originalFileChunkList.get(0).getMeta(), currentFileChunk.getTimestamp(), null);
if (fileChunk != null) {
combinedFileChunkList.add(fileChunk);
}
@@ -139,8 +139,8 @@ public class CombineChunkProcessWindowFunction extends ProcessWindowFunction 0) {
- FileChunk fileChunk = combineChunk(waitingToCombineChunkList, currentFileChunk.getUuid(), currentFileChunk.getFileName(), currentFileChunk.getFileType(), startOffset, currentFileChunk.getCombineMode(), lastChunkFlag, originalFileChunkList.get(0).getMeta(), originalFileChunkList.get(0).getTimestamp(), null);
+ if (!waitingToCombineChunkList.isEmpty()) {
+ FileChunk fileChunk = combineChunk(waitingToCombineChunkList, currentFileChunk.getUuid(), currentFileChunk.getFileName(), currentFileChunk.getFileType(), startOffset, currentFileChunk.getCombineMode(), lastChunkFlag, originalFileChunkList.get(0).getMeta(), currentFileChunk.getTimestamp(), null);
if (fileChunk != null) {
combinedFileChunkList.add(fileChunk);
}
diff --git a/src/main/java/com/zdjizhi/function/TestKeyedCoProcessFunction.java b/src/main/java/com/zdjizhi/function/TestKeyedCoProcessFunction.java
deleted file mode 100644
index fb71fd3..0000000
--- a/src/main/java/com/zdjizhi/function/TestKeyedCoProcessFunction.java
+++ /dev/null
@@ -1,101 +0,0 @@
-package com.zdjizhi.function;
-
-import cn.hutool.core.io.IoUtil;
-import com.zdjizhi.config.Configs;
-import com.zdjizhi.pojo.FileChunk;
-import com.zdjizhi.utils.HBaseConnectionUtil;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.MeterView;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
-import org.apache.flink.util.Collector;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.*;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class TestKeyedCoProcessFunction extends KeyedCoProcessFunction {
-
- private final Configuration configuration;
- public transient Counter chunksInCounter;
- public transient Counter fileMetasInCounter;
-
- private boolean isAsync;
- private Connection syncHBaseConnection;
- private AsyncConnection AsyncHBaseConnection;
- private Table table;
- private AsyncTable asyncTable;
- private List dataPutList;
- private List metaPutList;
- private long maxBatchCount;
-
- public TestKeyedCoProcessFunction(Configuration configuration) {
- this.configuration = configuration;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- MetricGroup metricGroup = getRuntimeContext().getMetricGroup().addGroup("file_chunk_combiner", "add_file_meta");
- chunksInCounter = metricGroup.counter("chunksInCount");
- fileMetasInCounter = metricGroup.counter("fileMetasInCount");
- metricGroup.meter("numChunksInPerSecond", new MeterView(chunksInCounter));
- metricGroup.meter("numFileMetasInPerSecond", new MeterView(fileMetasInCounter));
- isAsync = configuration.getBoolean(Configs.SINK_ASYNC);
- if (isAsync) {
- AsyncHBaseConnection = HBaseConnectionUtil.getInstance(configuration).getAsyncHBaseConnection();
- asyncTable = AsyncHBaseConnection.getTable(TableName.valueOf("default:" + configuration.get(Configs.SINK_HOS_BUCKET)));
- } else {
- syncHBaseConnection = HBaseConnectionUtil.getInstance(configuration).getSyncHBaseConnection();
- table = syncHBaseConnection.getTable(TableName.valueOf("default:" + configuration.get(Configs.SINK_HOS_BUCKET)));
- }
- maxBatchCount = configuration.getInteger(Configs.SINK_BATCH_COUNT);
- dataPutList = new ArrayList<>();
- metaPutList = new ArrayList<>();
- }
-
- @Override
- public void processElement1(FileChunk value, Context ctx, Collector out) throws IOException, InterruptedException {
- chunksInCounter.inc();
- Put dataPut = new Put(value.getUuid().getBytes());
- dataPut.addColumn("meta".getBytes(), "data".getBytes(), (value.toString()).getBytes());
- dataPutList.add(dataPut);
- if (dataPutList.size() >= maxBatchCount) {
- if (isAsync) {
- asyncTable.batch(dataPutList);
- dataPutList.clear();
- } else {
- table.batch(dataPutList, null);
- dataPutList.clear();
- }
- }
- }
-
- @Override
- public void processElement2(FileChunk value, Context ctx, Collector out) throws IOException, InterruptedException {
- fileMetasInCounter.inc();
- Put metaPut = new Put(value.getUuid().getBytes());
- metaPut.addColumn("meta".getBytes(), "meta".getBytes(), (value.getMeta().toString()).getBytes());
- metaPutList.add(metaPut);
- if (metaPutList.size() >= maxBatchCount) {
- if (isAsync) {
- asyncTable.batch(metaPutList);
- metaPutList.clear();
- } else {
- table.batch(metaPutList, null);
- metaPutList.clear();
- }
- }
- }
-
- @Override
- public void close() {
- IoUtil.close(table);
- IoUtil.close(syncHBaseConnection);
- IoUtil.close(AsyncHBaseConnection);
- }
-}
diff --git a/src/main/java/com/zdjizhi/sink/HBaseSink.java b/src/main/java/com/zdjizhi/sink/HBaseSink.java
index 3bab6aa..ffcef6e 100644
--- a/src/main/java/com/zdjizhi/sink/HBaseSink.java
+++ b/src/main/java/com/zdjizhi/sink/HBaseSink.java
@@ -23,6 +23,9 @@ import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import static com.zdjizhi.utils.PublicConstants.*;
import static com.zdjizhi.utils.HBaseColumnConstants.*;
@@ -61,7 +64,7 @@ public class HBaseSink extends RichSinkFunction {
public transient Counter mediaChunksCounter;
private boolean isAsync;
private Connection syncHBaseConnection;
- private AsyncConnection AsyncHBaseConnection;
+ private AsyncConnection asyncHBaseConnection;
private Table table;
private Table indexTimeTable;
private Table indexFilenameTable;
@@ -72,12 +75,12 @@ public class HBaseSink extends RichSinkFunction {
private List indexTimePutList;
private List indexFilenamePutList;
private long chunkSize;
- private int chunkCount;
private long maxBatchSize;
private long maxBatchCount;
+ private ScheduledExecutorService executorService;
private long rateLimitThreshold;
private String rateLimitExpression;
- private long timestamp;
+ private volatile long timestamp;
private long count;
private JexlExpression jexlExpression;
private JexlContext jexlContext;
@@ -148,57 +151,75 @@ public class HBaseSink extends RichSinkFunction {
metricGroup.meter("numMediaChunksOutPerSecond", new MeterView(mediaChunksCounter));
isAsync = configuration.getBoolean(Configs.SINK_ASYNC);
if (isAsync) {
- AsyncHBaseConnection = HBaseConnectionUtil.getInstance(configuration).getAsyncHBaseConnection();
- asyncTable = AsyncHBaseConnection.getTable(TableName.valueOf("default:" + configuration.get(Configs.SINK_HOS_BUCKET)));
- asyncIndexTimeTable = AsyncHBaseConnection.getTable(TableName.valueOf("default:index_time_" + configuration.get(Configs.SINK_HOS_BUCKET)));
- asyncIndexFilenameTable = AsyncHBaseConnection.getTable(TableName.valueOf("default:index_filename_" + configuration.get(Configs.SINK_HOS_BUCKET)));
+ asyncHBaseConnection = HBaseConnectionUtil.getInstance(configuration).getAsyncHBaseConnection();
+ asyncTable = asyncHBaseConnection.getTable(TableName.valueOf("default:" + configuration.get(Configs.SINK_HOS_BUCKET)));
+ asyncIndexTimeTable = asyncHBaseConnection.getTable(TableName.valueOf("default:index_time_" + configuration.get(Configs.SINK_HOS_BUCKET)));
+ asyncIndexFilenameTable = asyncHBaseConnection.getTable(TableName.valueOf("default:index_filename_" + configuration.get(Configs.SINK_HOS_BUCKET)));
} else {
syncHBaseConnection = HBaseConnectionUtil.getInstance(configuration).getSyncHBaseConnection();
table = syncHBaseConnection.getTable(TableName.valueOf("default:" + configuration.get(Configs.SINK_HOS_BUCKET)));
indexTimeTable = syncHBaseConnection.getTable(TableName.valueOf("default:index_time_" + configuration.get(Configs.SINK_HOS_BUCKET)));
indexFilenameTable = syncHBaseConnection.getTable(TableName.valueOf("default:index_filename_" + configuration.get(Configs.SINK_HOS_BUCKET)));
}
- maxBatchSize = configuration.getLong(Configs.SINK_BATCH_SIZE);
- maxBatchCount = configuration.getInteger(Configs.SINK_BATCH_COUNT);
- dataPutList = new ArrayList<>();
- indexTimePutList = new ArrayList<>();
- indexFilenamePutList = new ArrayList<>();
- chunkSize = 0;
- chunkCount = 0;
- rateLimitThreshold = configuration.getLong(Configs.SINK_RATE_LIMIT_THRESHOLD);
- rateLimitExpression = configuration.getString(Configs.SINK_RATE_LIMIT_EXCLUSION_EXPRESSION);
timestamp = System.currentTimeMillis();
- count = 0;
- JexlEngine jexlEngine = new JexlBuilder().create();
- jexlExpression = jexlEngine.createExpression(rateLimitExpression);
- jexlContext = new MapContext();
+ if (configuration.get(Configs.SINK_BATCH)) {
+ maxBatchSize = configuration.getLong(Configs.SINK_BATCH_SIZE);
+ maxBatchCount = configuration.getInteger(Configs.SINK_BATCH_COUNT);
+ dataPutList = new ArrayList<>();
+ indexTimePutList = new ArrayList<>();
+ indexFilenamePutList = new ArrayList<>();
+ chunkSize = 0;
+ executorService = Executors.newScheduledThreadPool(1);
+ long period = configuration.getInteger(Configs.SINK_BATCH_TIME);
+ executorService.scheduleWithFixedDelay(() -> {
+ if (System.currentTimeMillis() - timestamp > (period * 1000)) {
+ if (!dataPutList.isEmpty()) {
+ synchronized (this) {
+ sendBatchData();
+ }
+ }
+ }
+ }, period, period, TimeUnit.SECONDS);
+ }
+ if (rateLimitThreshold > 0) {
+ rateLimitThreshold = configuration.getLong(Configs.SINK_RATE_LIMIT_THRESHOLD);
+ rateLimitExpression = configuration.getString(Configs.SINK_RATE_LIMIT_EXCLUSION_EXPRESSION);
+ count = 0;
+ JexlEngine jexlEngine = new JexlBuilder().create();
+ jexlExpression = jexlEngine.createExpression(rateLimitExpression);
+ jexlContext = new MapContext();
+ }
}
@Override
public void invoke(FileChunk fileChunk, Context context) {
- chunksInCounter.inc();
- bytesInCounter.inc(fileChunk.getLength());
- if (rateLimitThreshold > 0) {
- count++;
- if (System.currentTimeMillis() - timestamp < 1000 && count > rateLimitThreshold) {
- if (checkFileChunk(fileChunk)) {
- sendFileChunk(fileChunk);
- } else {
- rateLimitDropChunksCounter.inc();
- }
- } else if (System.currentTimeMillis() - timestamp >= 1000) {
- if (checkFileChunk(fileChunk)) {
- sendFileChunk(fileChunk);
- } else {
- rateLimitDropChunksCounter.inc();
- timestamp = System.currentTimeMillis();
+ synchronized (this) {
+ long currentTimeMillis = System.currentTimeMillis();
+ chunksInCounter.inc();
+ bytesInCounter.inc(fileChunk.getLength());
+ if (rateLimitThreshold > 0) {
+ count++;
+ if (currentTimeMillis - timestamp < 1000 && count > rateLimitThreshold) {
+ if (checkFileChunk(fileChunk)) {
+ sendFileChunk(fileChunk);
+ } else {
+ rateLimitDropChunksCounter.inc();
+ }
+ } else if (currentTimeMillis - timestamp >= 1000) {
+ if (checkFileChunk(fileChunk)) {
+ sendFileChunk(fileChunk);
+ } else {
+ rateLimitDropChunksCounter.inc();
+ }
+ timestamp = currentTimeMillis;
count = 0;
+ } else {
+ sendFileChunk(fileChunk);
}
} else {
+ timestamp = currentTimeMillis;
sendFileChunk(fileChunk);
}
- } else {
- sendFileChunk(fileChunk);
}
}
@@ -208,7 +229,10 @@ public class HBaseSink extends RichSinkFunction {
IoUtil.close(indexTimeTable);
IoUtil.close(indexFilenameTable);
IoUtil.close(syncHBaseConnection);
- IoUtil.close(AsyncHBaseConnection);
+ IoUtil.close(asyncHBaseConnection);
+ if (executorService != null) {
+ executorService.shutdown();
+ }
}
private void sendFileChunk(FileChunk fileChunk) {
@@ -254,72 +278,52 @@ public class HBaseSink extends RichSinkFunction {
metaPut.addColumn(HBaseColumnConstants.BYTE_FAMILY_META, HBaseColumnConstants.BYTE_COLUMN_LAST_MODIFIED, Bytes.toBytes(timestamp));
dataPutList.add(metaPut);
}
- chunkCount++;
chunkSize += chunkLength;
chunksOutCounter.inc();
bytesOutCounter.inc(chunkLength);
calculateFileChunkMetrics(fileChunk);
- if (chunkSize >= maxBatchSize || chunkCount >= maxBatchCount) {
- if (isAsync) {
- if (dataPutList.size() > 0) {
- List> futures = asyncTable.batch(dataPutList);
- CompletableFuture.supplyAsync(() -> {
- for (CompletableFuture