diff --git a/pom.xml b/pom.xml
index 275475a..e49e6cf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
com.zdjizhi
file-chunk-combiner
- 24.01.18
+ 1.1.0
@@ -151,6 +151,54 @@
httpclient
4.5.13
+
+ org.apache.httpcomponents
+ httpmime
+ 4.5.13
+
+
+ org.apache.httpcomponents
+ httpasyncclient
+ 4.1.5
+
+
+ org.apache.hbase
+ hbase-client
+ 2.2.3
+
+
+ org.apache.zookeeper
+ zookeeper
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ org.slf4j
+ log4j-over-slf4j
+
+
+ org.apache.hadoop
+ hadoop-annotations
+
+
+ org.apache.hadoop
+ hadoop-auth
+
+
+
+
+ org.apache.hadoop
+ hadoop-common
+ 2.8.5
+ provided
+
+
+ org.apache.commons
+ commons-jexl3
+ 3.2.1
+
@@ -160,8 +208,8 @@
maven-compiler-plugin
3.6.1
- 1.8
- 1.8
+ 11
+ 11
true
@@ -169,7 +217,6 @@
org.apache.maven.plugins
maven-shade-plugin
- 3.1.1
false
true
@@ -247,5 +294,4 @@
-
\ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/FileChunkCombiner.java b/src/main/java/com/zdjizhi/FileChunkCombiner.java
index 06f7402..1c5fc08 100644
--- a/src/main/java/com/zdjizhi/FileChunkCombiner.java
+++ b/src/main/java/com/zdjizhi/FileChunkCombiner.java
@@ -3,12 +3,12 @@ package com.zdjizhi;
import com.zdjizhi.config.Configs;
import com.zdjizhi.function.*;
import com.zdjizhi.pojo.*;
+import com.zdjizhi.sink.HBaseSink;
import com.zdjizhi.sink.HosSink;
import com.zdjizhi.kafka.KafkaConsumer;
import com.zdjizhi.trigger.LastChunkOrNoDataInTimeTrigger;
import com.zdjizhi.trigger.MultipleTrigger;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
@@ -24,9 +24,8 @@ import org.apache.flink.util.OutputTag;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
-import java.util.Objects;
-public class FileChunkCombiner extends KafkaConsumer {
+public class FileChunkCombiner {
public static void main(String[] args) throws Exception {
final ParameterTool parameterTool = ParameterTool.fromPropertiesFile(args[0]);
@@ -43,7 +42,7 @@ public class FileChunkCombiner extends KafkaConsumer {
.name("Kafka Source")
.map(new ParseMessagePackMapFunction())
.name("Map: Parse Message Pack")
- .filter((FilterFunction) Objects::nonNull)
+ .filter(new FileChunkFilter(configuration.getLong(Configs.FILE_MAX_SIZE), configuration.getString(Configs.FILTER_EXPRESSION)))
.assignTimestampsAndWatermarks(watermarkStrategy);
OutputTag delayedChunkOutputTag = new OutputTag("delayed-chunk") {
@@ -63,14 +62,23 @@ public class FileChunkCombiner extends KafkaConsumer {
.setParallelism(configuration.get(Configs.COMBINER_WINDOW_PARALLELISM))
.disableChaining();
- HosSink hosSink = new HosSink(configuration);
- windowStream.addSink(hosSink)
- .name("Hos")
- .setParallelism(configuration.get(Configs.SINK_HOS_PARALLELISM));
- windowStream.getSideOutput(delayedChunkOutputTag)
- .map(new SideOutputMapFunction())
- .addSink(hosSink)
- .name("Hos Delayed Chunk");
+ if ("hos".equals(configuration.get(Configs.SINK_TYPE))) {
+ windowStream.addSink(new HosSink(configuration))
+ .name("Hos")
+ .setParallelism(configuration.get(Configs.SINK_PARALLELISM));
+ windowStream.getSideOutput(delayedChunkOutputTag)
+ .map(new SideOutputMapFunction())
+ .addSink(new HosSink(configuration))
+ .name("Hos Delayed Chunk");
+ } else {
+ windowStream.addSink(new HBaseSink(configuration))
+ .name("HBase")
+ .setParallelism(configuration.get(Configs.SINK_PARALLELISM));
+ windowStream.getSideOutput(delayedChunkOutputTag)
+ .map(new SideOutputMapFunction())
+ .addSink(new HBaseSink(configuration))
+ .name("HBase Delayed Chunk");
+ }
environment.execute(configuration.get(Configs.FLINK_JOB_NAME));
}
diff --git a/src/main/java/com/zdjizhi/config/Configs.java b/src/main/java/com/zdjizhi/config/Configs.java
index 6d79bb8..e28426e 100644
--- a/src/main/java/com/zdjizhi/config/Configs.java
+++ b/src/main/java/com/zdjizhi/config/Configs.java
@@ -9,9 +9,6 @@ public class Configs {
.defaultValue("FILE-CHUNK-COMBINER")
.withDescription("The name of job.");
- public static final ConfigOption SOURCE_KAFKA_PARALLELISM = ConfigOptions.key("source.kafka.parallelism")
- .intType()
- .defaultValue(1);
public static final ConfigOption KAFKA_BROKER = ConfigOptions.key("source.kafka.broker")
.stringType()
.noDefaultValue();
@@ -46,9 +43,6 @@ public class Configs {
.stringType()
.noDefaultValue();
- public static final ConfigOption PARSE_MESSAGE_PACK_PARALLELISM = ConfigOptions.key("parse.message.pack.parallelism")
- .intType()
- .defaultValue(1);
public static final ConfigOption COMBINER_WINDOW_PARALLELISM = ConfigOptions.key("combiner.window.parallelism")
.intType()
.defaultValue(1);
@@ -58,13 +52,28 @@ public class Configs {
public static final ConfigOption COMBINER_WINDOW_IDLE_TIME = ConfigOptions.key("combiner.window.idle.time")
.longType()
.defaultValue(5L);
- public static final ConfigOption COMBINER_WINDOW_KEY_MAX_CHUNK = ConfigOptions.key("combiner.window.key.max.chunk")
- .longType()
- .defaultValue(5L);
- public static final ConfigOption SINK_HOS_PARALLELISM = ConfigOptions.key("sink.hos.parallelism")
+ public static final ConfigOption SINK_TYPE = ConfigOptions.key("sink.type")
+ .stringType()
+ .defaultValue("hos");
+ public static final ConfigOption SINK_PARALLELISM = ConfigOptions.key("sink.parallelism")
.intType()
.defaultValue(1);
+ public static final ConfigOption SINK_ASYNC = ConfigOptions.key("sink.async")
+ .booleanType()
+ .defaultValue(false);
+ public static final ConfigOption SINK_BATCH = ConfigOptions.key("sink.batch")
+ .booleanType()
+ .defaultValue(false);
+ public static final ConfigOption SINK_BATCH_COUNT = ConfigOptions.key("sink.batch.count")
+ .intType()
+ .defaultValue(1);
+ public static final ConfigOption SINK_BATCH_SIZE = ConfigOptions.key("sink.batch.size")
+ .longType()
+ .defaultValue(Long.MAX_VALUE);
+ public static final ConfigOption SINK_HOS_LOAD_BALANCE_MODE = ConfigOptions.key("sink.hos.load.balance.mode")
+ .intType()
+ .defaultValue(0);
public static final ConfigOption SINK_HOS_ENDPOINT = ConfigOptions.key("sink.hos.endpoint")
.stringType()
.noDefaultValue();
@@ -73,7 +82,7 @@ public class Configs {
.noDefaultValue();
public static final ConfigOption SINK_HOS_TOKEN = ConfigOptions.key("sink.hos.token")
.stringType()
- .noDefaultValue();
+ .defaultValue("");
public static final ConfigOption SINK_HOS_HTTP_MAX_TOTAL = ConfigOptions.key("sink.hos.http.max.total")
.intType()
.defaultValue(2000);
@@ -93,4 +102,30 @@ public class Configs {
.intType()
.defaultValue(60000);
+ public static final ConfigOption SINK_HBASE_ZOOKEEPER = ConfigOptions.key("sink.hbase.zookeeper")
+ .stringType()
+ .defaultValue("");
+ public static final ConfigOption SINK_HBASE_RETRIES_NUMBER = ConfigOptions.key("sink.hbase.retries.number")
+ .intType()
+ .defaultValue(10);
+ public static final ConfigOption SINK_HBASE_RPC_TIMEOUT = ConfigOptions.key("sink.hbase.rpc.timeout")
+ .intType()
+ .defaultValue(600000);
+ public static final ConfigOption SINK_HBASE_CLIENT_WRITE_BUFFER = ConfigOptions.key("sink.hbase.client.write.buffer")
+ .intType()
+ .defaultValue(10485760);
+ public static final ConfigOption SINK_HBASE_CLIENT_IPC_POOL_SIZE = ConfigOptions.key("sink.hbase.client.ipc.pool.size")
+ .intType()
+ .defaultValue(1);
+
+ public static final ConfigOption FILE_MAX_CHUNK_COUNT = ConfigOptions.key("file.max.chunk.count")
+ .intType()
+ .defaultValue(100000);
+ public static final ConfigOption FILE_MAX_SIZE = ConfigOptions.key("file.max.size")
+ .longType()
+ .defaultValue(10737418240L);
+ public static final ConfigOption FILTER_EXPRESSION = ConfigOptions.key("filter.expression")
+ .stringType()
+ .defaultValue("");
+
}
diff --git a/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java b/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java
index 8f0f40d..5eb134d 100644
--- a/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java
+++ b/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java
@@ -10,7 +10,7 @@ import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
-import java.util.List;
+import java.util.*;
public class CombineChunkProcessWindowFunction extends ProcessWindowFunction {
@@ -35,8 +35,8 @@ public class CombineChunkProcessWindowFunction extends ProcessWindowFunction elements, Collector out) throws Exception {
- List fileChunks = PublicUtil.combine(elements, configuration.get(Configs.COMBINER_WINDOW_KEY_MAX_CHUNK), duplicateChunkCounter, combineErrorCounter, seekChunkCounter, appendChunkCounter);
+ public void process(String string, Context context, Iterable elements, Collector out) {
+ List fileChunks = PublicUtil.combine(elements, configuration.get(Configs.FILE_MAX_CHUNK_COUNT), duplicateChunkCounter, combineErrorCounter, seekChunkCounter, appendChunkCounter);
for (FileChunk fileChunk : fileChunks) {
out.collect(fileChunk);
}
diff --git a/src/main/java/com/zdjizhi/function/FileChunkFilter.java b/src/main/java/com/zdjizhi/function/FileChunkFilter.java
new file mode 100644
index 0000000..e0af784
--- /dev/null
+++ b/src/main/java/com/zdjizhi/function/FileChunkFilter.java
@@ -0,0 +1,48 @@
+package com.zdjizhi.function;
+
+import cn.hutool.core.util.StrUtil;
+import com.zdjizhi.pojo.FileChunk;
+import org.apache.commons.jexl3.*;
+import org.apache.flink.api.common.functions.RichFilterFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+
+public class FileChunkFilter extends RichFilterFunction {
+ private final long maxFileSize;
+ private final String filterExpression;
+ private transient Counter filterChunkCounter;
+ private JexlExpression jexlExpression;
+ private JexlContext jexlContext;
+
+ public FileChunkFilter(long maxFileSize, String filterExpression) {
+ this.maxFileSize = maxFileSize;
+ this.filterExpression = filterExpression;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
+ filterChunkCounter = metricGroup.counter("filterChunkCount");
+ JexlEngine jexlEngine = new JexlBuilder().create();
+ jexlExpression = jexlEngine.createExpression(filterExpression);
+ jexlContext = new MapContext();
+ }
+
+ @Override
+ public boolean filter(FileChunk value) {
+ if (value == null || value.getOffset() > maxFileSize) {
+ filterChunkCounter.inc();
+ return false;
+ }
+ if (StrUtil.isNotEmpty(filterExpression)) {
+ jexlContext.set(value.getClass().getSimpleName(), value);
+ if (!Boolean.parseBoolean(jexlExpression.evaluate(jexlContext).toString())) {
+ filterChunkCounter.inc();
+ return false;
+ }
+ }
+ return true;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/sink/HBaseSink.java b/src/main/java/com/zdjizhi/sink/HBaseSink.java
new file mode 100644
index 0000000..15c2c01
--- /dev/null
+++ b/src/main/java/com/zdjizhi/sink/HBaseSink.java
@@ -0,0 +1,210 @@
+package com.zdjizhi.sink;
+
+import cn.hutool.core.io.IoUtil;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.config.Configs;
+import com.zdjizhi.pojo.FileChunk;
+import com.zdjizhi.utils.HBaseColumnConstants;
+import com.zdjizhi.utils.HBaseConnectionUtil;
+import com.zdjizhi.utils.PublicConstants;
+import com.zdjizhi.utils.PublicUtil;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
+
+import static com.zdjizhi.utils.PublicConstants.*;
+import static com.zdjizhi.utils.HBaseColumnConstants.*;
+
+public class HBaseSink extends RichSinkFunction {
+ private static final Log LOG = LogFactory.get();
+
+ private final Configuration configuration;
+ private transient Counter sendHBaseCounter;
+ private transient Counter sendHBaseErrorCounter;
+ private transient Counter sendHBaseFileCounter;
+ private transient Counter sendHBaseChunkCounter;
+ private boolean isAsync;
+ private Connection syncHBaseConnection;
+ private AsyncConnection AsyncHBaseConnection;
+ private Table table;
+ private Table indexTimeTable;
+ private Table indexFilenameTable;
+ private AsyncTable asyncTable;
+ private AsyncTable asyncIndexTimeTable;
+ private AsyncTable asyncIndexFilenameTable;
+ private List dataPutList;
+ private List indexTimePutList;
+ private List indexFilenamePutList;
+ private long chunkSize;
+ private int chunkCount;
+ private long maxBatchSize;
+ private long maxBatchCount;
+
+ public HBaseSink(Configuration configuration) {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
+ sendHBaseCounter = metricGroup.counter("sendHBaseCount");
+ sendHBaseErrorCounter = metricGroup.counter("sendHBaseErrorCount");
+ sendHBaseFileCounter = metricGroup.counter("sendHBaseFileCount");
+ sendHBaseChunkCounter = metricGroup.counter("sendHBaseChunkCount");
+ isAsync = configuration.getBoolean(Configs.SINK_ASYNC);
+ if (isAsync) {
+ AsyncHBaseConnection = HBaseConnectionUtil.getInstance(configuration).getAsyncHBaseConnection();
+ asyncTable = AsyncHBaseConnection.getTable(TableName.valueOf("default:" + configuration.get(Configs.SINK_HOS_BUCKET)));
+ asyncIndexTimeTable = AsyncHBaseConnection.getTable(TableName.valueOf("default:index_time_" + configuration.get(Configs.SINK_HOS_BUCKET)));
+ asyncIndexFilenameTable = AsyncHBaseConnection.getTable(TableName.valueOf("default:index_filename_" + configuration.get(Configs.SINK_HOS_BUCKET)));
+ } else {
+ syncHBaseConnection = HBaseConnectionUtil.getInstance(configuration).getSyncHBaseConnection();
+ table = syncHBaseConnection.getTable(TableName.valueOf("default:" + configuration.get(Configs.SINK_HOS_BUCKET)));
+ indexTimeTable = syncHBaseConnection.getTable(TableName.valueOf("default:index_time_" + configuration.get(Configs.SINK_HOS_BUCKET)));
+ indexFilenameTable = syncHBaseConnection.getTable(TableName.valueOf("default:index_filename_" + configuration.get(Configs.SINK_HOS_BUCKET)));
+ }
+ maxBatchSize = configuration.getLong(Configs.SINK_BATCH_SIZE);
+ maxBatchCount = configuration.getInteger(Configs.SINK_BATCH_COUNT);
+ dataPutList = new ArrayList<>();
+ indexTimePutList = new ArrayList<>();
+ indexFilenamePutList = new ArrayList<>();
+ chunkSize = 0;
+ chunkCount = 0;
+ }
+
+ @Override
+ public void invoke(FileChunk fileChunk, Context context) {
+ if (COMBINE_MODE_SEEK.equals(fileChunk.getCombineMode()) && configuration.get(Configs.SINK_BATCH)) {
+ sendHBaseChunkCounter.inc();
+ byte[] data = "".getBytes();
+ if (fileChunk.getChunk() != null) {
+ data = fileChunk.getChunk();
+ }
+ long timestamp = System.currentTimeMillis();
+ Map partMessageMap = new HashMap<>();
+ partMessageMap.put(APPEND_FILE_PART_MESSAGE_CHUNK_COUNT, fileChunk.getChunkCount() + "");
+ partMessageMap.put(APPEND_FILE_PART_MESSAGE_LAST_PART_FLAG, fileChunk.getLastChunkFlag() + "");
+ partMessageMap.put(APPEND_FILE_PART_MESSAGE_SIZE, data.length + "");
+ Put dataPut = new Put(Bytes.toBytes(PublicUtil.getRowKey(fileChunk.getUuid()) + PublicConstants.FILE_DATA_ROW_SUFFIX));
+ dataPut.addColumn(BYTE_FAMILY_DATA, Bytes.toBytes(String.valueOf(fileChunk.getOffset())), data);
+ dataPut.addColumn(BYTE_FAMILY_META, Bytes.toBytes(String.valueOf(fileChunk.getOffset())), Bytes.toBytes(partMessageMap.toString()));
+ dataPutList.add(dataPut);
+ if (fileChunk.getOffset() == 0) {
+ Put metaPut = new Put(PublicUtil.getRowKey(fileChunk.getUuid()).getBytes());
+ String filename = fileChunk.getUuid() + "." + fileChunk.getFileType();
+ String randomIndexHead = PublicUtil.getIndexDataHead(filename);
+ String indexTimeKey = randomIndexHead + "|" + timestamp + "|" + PublicUtil.getRowKey(fileChunk.getUuid());
+ String indexFilenameKey = randomIndexHead + "|" + filename;
+ metaPut.addColumn(BYTE_FAMILY_META, Bytes.toBytes(COLUMN_USER_DEFINED_META_PREFIX + FILE_META_FILE_TYPE), Bytes.toBytes(fileChunk.getFileType()));
+ metaPut.addColumn(BYTE_FAMILY_META, BYTE_COLUMN_CONTENT_TYPE, Bytes.toBytes("application/octet-stream"));
+ metaPut.addColumn(BYTE_FAMILY_META, BYTE_COLUMN_FILENAME, Bytes.toBytes(filename));
+ metaPut.addColumn(BYTE_FAMILY_META, BYTE_COLUMN_UPLOAD_METHOD, Bytes.toBytes(String.valueOf(5)));
+ metaPut.addColumn(BYTE_FAMILY_META, BYTE_COLUMN_INDEX_FILENAME_KEY, Bytes.toBytes(indexTimeKey));
+ metaPut.addColumn(BYTE_FAMILY_META, BYTE_COLUMN_INDEX_TIME_KEY, Bytes.toBytes(indexFilenameKey));
+ metaPut.addColumn(BYTE_FAMILY_META, BYTE_COLUMN_LAST_MODIFIED, Bytes.toBytes(timestamp));
+ metaPut.addColumn(BYTE_FAMILY_META, BYTE_COLUMN_APPEND_MODE, Bytes.toBytes(APPEND_MODE_OFFSET));
+ metaPut.addColumn(BYTE_FAMILY_META, BYTE_COLUMN_COMBINE_MODE, Bytes.toBytes(fileChunk.getCombineMode()));
+ dataPutList.add(metaPut);
+ Put indexTimePut = new Put(Bytes.toBytes(indexTimeKey));
+ indexTimePut.addColumn(HBaseColumnConstants.BYTE_FAMILY_META, HBaseColumnConstants.BYTE_COLUMN_FILENAME, Bytes.toBytes(fileChunk.getUuid()));
+ indexTimePutList.add(indexTimePut);
+ Put indexFilenamePut = new Put(Bytes.toBytes(indexFilenameKey));
+ indexFilenamePut.addColumn(HBaseColumnConstants.BYTE_FAMILY_META, HBaseColumnConstants.BYTE_COLUMN_FILENAME, Bytes.toBytes(fileChunk.getUuid()));
+ indexFilenamePutList.add(indexFilenamePut);
+ sendHBaseFileCounter.inc();
+ } else {
+ Put metaPut = new Put(Bytes.toBytes(PublicUtil.getRowKey(fileChunk.getUuid())));
+ metaPut.addColumn(HBaseColumnConstants.BYTE_FAMILY_META, HBaseColumnConstants.BYTE_COLUMN_LAST_MODIFIED, Bytes.toBytes(timestamp));
+ dataPutList.add(metaPut);
+ }
+ chunkCount++;
+ chunkSize += data.length;
+ if (chunkSize >= maxBatchSize || chunkCount >= maxBatchCount) {
+ if (isAsync) {
+ if (dataPutList.size() > 0) {
+ List> futures = asyncTable.batch(dataPutList);
+ sendHBaseCounter.inc();
+ CompletableFuture.supplyAsync(() -> {
+ for (CompletableFuture