From 644ca7f35cfd9c2bb92b0ceeddef7ce99cc43f4c Mon Sep 17 00:00:00 2001 From: houjinchuan Date: Thu, 29 Feb 2024 19:03:07 +0800 Subject: [PATCH] =?UTF-8?q?[GAL-504]=20=E4=BC=98=E5=8C=96File=20Chunk=20Co?= =?UTF-8?q?mbiner=E6=80=A7=E8=83=BD=E5=8F=8A=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 56 ++++- .../java/com/zdjizhi/FileChunkCombiner.java | 32 ++- src/main/java/com/zdjizhi/config/Configs.java | 57 ++++- .../CombineChunkProcessWindowFunction.java | 6 +- .../com/zdjizhi/function/FileChunkFilter.java | 48 ++++ src/main/java/com/zdjizhi/sink/HBaseSink.java | 210 +++++++++++++++++ src/main/java/com/zdjizhi/sink/HosSink.java | 213 +++++++++++++++++- .../zdjizhi/utils/HBaseColumnConstants.java | 50 ++++ .../zdjizhi/utils/HBaseConnectionUtil.java | 59 +++++ .../com/zdjizhi/utils/HttpClientUtil.java | 87 ++++--- .../zdjizhi/utils/HttpHeaderConstants.java | 54 +++++ .../com/zdjizhi/utils/PublicConstants.java | 33 +++ .../java/com/zdjizhi/utils/PublicUtil.java | 77 ++----- src/main/resources/common.properties | 40 +++- 14 files changed, 888 insertions(+), 134 deletions(-) create mode 100644 src/main/java/com/zdjizhi/function/FileChunkFilter.java create mode 100644 src/main/java/com/zdjizhi/sink/HBaseSink.java create mode 100644 src/main/java/com/zdjizhi/utils/HBaseColumnConstants.java create mode 100644 src/main/java/com/zdjizhi/utils/HBaseConnectionUtil.java create mode 100644 src/main/java/com/zdjizhi/utils/HttpHeaderConstants.java create mode 100644 src/main/java/com/zdjizhi/utils/PublicConstants.java diff --git a/pom.xml b/pom.xml index 275475a..e49e6cf 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.zdjizhi file-chunk-combiner - 24.01.18 + 1.1.0 @@ -151,6 +151,54 @@ httpclient 4.5.13 + + org.apache.httpcomponents + httpmime + 4.5.13 + + + org.apache.httpcomponents + httpasyncclient + 4.1.5 + + + org.apache.hbase + hbase-client + 2.2.3 + + + org.apache.zookeeper + zookeeper + + + org.slf4j + slf4j-log4j12 + + + org.slf4j + log4j-over-slf4j + + + org.apache.hadoop + hadoop-annotations + + + org.apache.hadoop + hadoop-auth + + + + + org.apache.hadoop + hadoop-common + 2.8.5 + provided + + + org.apache.commons + commons-jexl3 + 3.2.1 + @@ -160,8 +208,8 @@ maven-compiler-plugin 3.6.1 - 1.8 - 1.8 + 11 + 11 true @@ -169,7 +217,6 @@ org.apache.maven.plugins maven-shade-plugin - 3.1.1 false true @@ -247,5 +294,4 @@ - \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/FileChunkCombiner.java b/src/main/java/com/zdjizhi/FileChunkCombiner.java index 06f7402..1c5fc08 100644 --- a/src/main/java/com/zdjizhi/FileChunkCombiner.java +++ b/src/main/java/com/zdjizhi/FileChunkCombiner.java @@ -3,12 +3,12 @@ package com.zdjizhi; import com.zdjizhi.config.Configs; import com.zdjizhi.function.*; import com.zdjizhi.pojo.*; +import com.zdjizhi.sink.HBaseSink; import com.zdjizhi.sink.HosSink; import com.zdjizhi.kafka.KafkaConsumer; import com.zdjizhi.trigger.LastChunkOrNoDataInTimeTrigger; import com.zdjizhi.trigger.MultipleTrigger; import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; @@ -24,9 +24,8 @@ import org.apache.flink.util.OutputTag; import java.time.Duration; import java.util.ArrayList; import java.util.List; -import java.util.Objects; -public class FileChunkCombiner extends KafkaConsumer { +public class FileChunkCombiner { public static void main(String[] args) throws Exception { final ParameterTool parameterTool = ParameterTool.fromPropertiesFile(args[0]); @@ -43,7 +42,7 @@ public class FileChunkCombiner extends KafkaConsumer { .name("Kafka Source") .map(new ParseMessagePackMapFunction()) .name("Map: Parse Message Pack") - .filter((FilterFunction) Objects::nonNull) + .filter(new FileChunkFilter(configuration.getLong(Configs.FILE_MAX_SIZE), configuration.getString(Configs.FILTER_EXPRESSION))) .assignTimestampsAndWatermarks(watermarkStrategy); OutputTag delayedChunkOutputTag = new OutputTag("delayed-chunk") { @@ -63,14 +62,23 @@ public class FileChunkCombiner extends KafkaConsumer { .setParallelism(configuration.get(Configs.COMBINER_WINDOW_PARALLELISM)) .disableChaining(); - HosSink hosSink = new HosSink(configuration); - windowStream.addSink(hosSink) - .name("Hos") - .setParallelism(configuration.get(Configs.SINK_HOS_PARALLELISM)); - windowStream.getSideOutput(delayedChunkOutputTag) - .map(new SideOutputMapFunction()) - .addSink(hosSink) - .name("Hos Delayed Chunk"); + if ("hos".equals(configuration.get(Configs.SINK_TYPE))) { + windowStream.addSink(new HosSink(configuration)) + .name("Hos") + .setParallelism(configuration.get(Configs.SINK_PARALLELISM)); + windowStream.getSideOutput(delayedChunkOutputTag) + .map(new SideOutputMapFunction()) + .addSink(new HosSink(configuration)) + .name("Hos Delayed Chunk"); + } else { + windowStream.addSink(new HBaseSink(configuration)) + .name("HBase") + .setParallelism(configuration.get(Configs.SINK_PARALLELISM)); + windowStream.getSideOutput(delayedChunkOutputTag) + .map(new SideOutputMapFunction()) + .addSink(new HBaseSink(configuration)) + .name("HBase Delayed Chunk"); + } environment.execute(configuration.get(Configs.FLINK_JOB_NAME)); } diff --git a/src/main/java/com/zdjizhi/config/Configs.java b/src/main/java/com/zdjizhi/config/Configs.java index 6d79bb8..e28426e 100644 --- a/src/main/java/com/zdjizhi/config/Configs.java +++ b/src/main/java/com/zdjizhi/config/Configs.java @@ -9,9 +9,6 @@ public class Configs { .defaultValue("FILE-CHUNK-COMBINER") .withDescription("The name of job."); - public static final ConfigOption SOURCE_KAFKA_PARALLELISM = ConfigOptions.key("source.kafka.parallelism") - .intType() - .defaultValue(1); public static final ConfigOption KAFKA_BROKER = ConfigOptions.key("source.kafka.broker") .stringType() .noDefaultValue(); @@ -46,9 +43,6 @@ public class Configs { .stringType() .noDefaultValue(); - public static final ConfigOption PARSE_MESSAGE_PACK_PARALLELISM = ConfigOptions.key("parse.message.pack.parallelism") - .intType() - .defaultValue(1); public static final ConfigOption COMBINER_WINDOW_PARALLELISM = ConfigOptions.key("combiner.window.parallelism") .intType() .defaultValue(1); @@ -58,13 +52,28 @@ public class Configs { public static final ConfigOption COMBINER_WINDOW_IDLE_TIME = ConfigOptions.key("combiner.window.idle.time") .longType() .defaultValue(5L); - public static final ConfigOption COMBINER_WINDOW_KEY_MAX_CHUNK = ConfigOptions.key("combiner.window.key.max.chunk") - .longType() - .defaultValue(5L); - public static final ConfigOption SINK_HOS_PARALLELISM = ConfigOptions.key("sink.hos.parallelism") + public static final ConfigOption SINK_TYPE = ConfigOptions.key("sink.type") + .stringType() + .defaultValue("hos"); + public static final ConfigOption SINK_PARALLELISM = ConfigOptions.key("sink.parallelism") .intType() .defaultValue(1); + public static final ConfigOption SINK_ASYNC = ConfigOptions.key("sink.async") + .booleanType() + .defaultValue(false); + public static final ConfigOption SINK_BATCH = ConfigOptions.key("sink.batch") + .booleanType() + .defaultValue(false); + public static final ConfigOption SINK_BATCH_COUNT = ConfigOptions.key("sink.batch.count") + .intType() + .defaultValue(1); + public static final ConfigOption SINK_BATCH_SIZE = ConfigOptions.key("sink.batch.size") + .longType() + .defaultValue(Long.MAX_VALUE); + public static final ConfigOption SINK_HOS_LOAD_BALANCE_MODE = ConfigOptions.key("sink.hos.load.balance.mode") + .intType() + .defaultValue(0); public static final ConfigOption SINK_HOS_ENDPOINT = ConfigOptions.key("sink.hos.endpoint") .stringType() .noDefaultValue(); @@ -73,7 +82,7 @@ public class Configs { .noDefaultValue(); public static final ConfigOption SINK_HOS_TOKEN = ConfigOptions.key("sink.hos.token") .stringType() - .noDefaultValue(); + .defaultValue(""); public static final ConfigOption SINK_HOS_HTTP_MAX_TOTAL = ConfigOptions.key("sink.hos.http.max.total") .intType() .defaultValue(2000); @@ -93,4 +102,30 @@ public class Configs { .intType() .defaultValue(60000); + public static final ConfigOption SINK_HBASE_ZOOKEEPER = ConfigOptions.key("sink.hbase.zookeeper") + .stringType() + .defaultValue(""); + public static final ConfigOption SINK_HBASE_RETRIES_NUMBER = ConfigOptions.key("sink.hbase.retries.number") + .intType() + .defaultValue(10); + public static final ConfigOption SINK_HBASE_RPC_TIMEOUT = ConfigOptions.key("sink.hbase.rpc.timeout") + .intType() + .defaultValue(600000); + public static final ConfigOption SINK_HBASE_CLIENT_WRITE_BUFFER = ConfigOptions.key("sink.hbase.client.write.buffer") + .intType() + .defaultValue(10485760); + public static final ConfigOption SINK_HBASE_CLIENT_IPC_POOL_SIZE = ConfigOptions.key("sink.hbase.client.ipc.pool.size") + .intType() + .defaultValue(1); + + public static final ConfigOption FILE_MAX_CHUNK_COUNT = ConfigOptions.key("file.max.chunk.count") + .intType() + .defaultValue(100000); + public static final ConfigOption FILE_MAX_SIZE = ConfigOptions.key("file.max.size") + .longType() + .defaultValue(10737418240L); + public static final ConfigOption FILTER_EXPRESSION = ConfigOptions.key("filter.expression") + .stringType() + .defaultValue(""); + } diff --git a/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java b/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java index 8f0f40d..5eb134d 100644 --- a/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java +++ b/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java @@ -10,7 +10,7 @@ import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; -import java.util.List; +import java.util.*; public class CombineChunkProcessWindowFunction extends ProcessWindowFunction { @@ -35,8 +35,8 @@ public class CombineChunkProcessWindowFunction extends ProcessWindowFunction elements, Collector out) throws Exception { - List fileChunks = PublicUtil.combine(elements, configuration.get(Configs.COMBINER_WINDOW_KEY_MAX_CHUNK), duplicateChunkCounter, combineErrorCounter, seekChunkCounter, appendChunkCounter); + public void process(String string, Context context, Iterable elements, Collector out) { + List fileChunks = PublicUtil.combine(elements, configuration.get(Configs.FILE_MAX_CHUNK_COUNT), duplicateChunkCounter, combineErrorCounter, seekChunkCounter, appendChunkCounter); for (FileChunk fileChunk : fileChunks) { out.collect(fileChunk); } diff --git a/src/main/java/com/zdjizhi/function/FileChunkFilter.java b/src/main/java/com/zdjizhi/function/FileChunkFilter.java new file mode 100644 index 0000000..e0af784 --- /dev/null +++ b/src/main/java/com/zdjizhi/function/FileChunkFilter.java @@ -0,0 +1,48 @@ +package com.zdjizhi.function; + +import cn.hutool.core.util.StrUtil; +import com.zdjizhi.pojo.FileChunk; +import org.apache.commons.jexl3.*; +import org.apache.flink.api.common.functions.RichFilterFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.MetricGroup; + +public class FileChunkFilter extends RichFilterFunction { + private final long maxFileSize; + private final String filterExpression; + private transient Counter filterChunkCounter; + private JexlExpression jexlExpression; + private JexlContext jexlContext; + + public FileChunkFilter(long maxFileSize, String filterExpression) { + this.maxFileSize = maxFileSize; + this.filterExpression = filterExpression; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + MetricGroup metricGroup = getRuntimeContext().getMetricGroup(); + filterChunkCounter = metricGroup.counter("filterChunkCount"); + JexlEngine jexlEngine = new JexlBuilder().create(); + jexlExpression = jexlEngine.createExpression(filterExpression); + jexlContext = new MapContext(); + } + + @Override + public boolean filter(FileChunk value) { + if (value == null || value.getOffset() > maxFileSize) { + filterChunkCounter.inc(); + return false; + } + if (StrUtil.isNotEmpty(filterExpression)) { + jexlContext.set(value.getClass().getSimpleName(), value); + if (!Boolean.parseBoolean(jexlExpression.evaluate(jexlContext).toString())) { + filterChunkCounter.inc(); + return false; + } + } + return true; + } +} diff --git a/src/main/java/com/zdjizhi/sink/HBaseSink.java b/src/main/java/com/zdjizhi/sink/HBaseSink.java new file mode 100644 index 0000000..15c2c01 --- /dev/null +++ b/src/main/java/com/zdjizhi/sink/HBaseSink.java @@ -0,0 +1,210 @@ +package com.zdjizhi.sink; + +import cn.hutool.core.io.IoUtil; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.config.Configs; +import com.zdjizhi.pojo.FileChunk; +import com.zdjizhi.utils.HBaseColumnConstants; +import com.zdjizhi.utils.HBaseConnectionUtil; +import com.zdjizhi.utils.PublicConstants; +import com.zdjizhi.utils.PublicUtil; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.util.Bytes; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.CompletableFuture; + +import static com.zdjizhi.utils.PublicConstants.*; +import static com.zdjizhi.utils.HBaseColumnConstants.*; + +public class HBaseSink extends RichSinkFunction { + private static final Log LOG = LogFactory.get(); + + private final Configuration configuration; + private transient Counter sendHBaseCounter; + private transient Counter sendHBaseErrorCounter; + private transient Counter sendHBaseFileCounter; + private transient Counter sendHBaseChunkCounter; + private boolean isAsync; + private Connection syncHBaseConnection; + private AsyncConnection AsyncHBaseConnection; + private Table table; + private Table indexTimeTable; + private Table indexFilenameTable; + private AsyncTable asyncTable; + private AsyncTable asyncIndexTimeTable; + private AsyncTable asyncIndexFilenameTable; + private List dataPutList; + private List indexTimePutList; + private List indexFilenamePutList; + private long chunkSize; + private int chunkCount; + private long maxBatchSize; + private long maxBatchCount; + + public HBaseSink(Configuration configuration) { + this.configuration = configuration; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + MetricGroup metricGroup = getRuntimeContext().getMetricGroup(); + sendHBaseCounter = metricGroup.counter("sendHBaseCount"); + sendHBaseErrorCounter = metricGroup.counter("sendHBaseErrorCount"); + sendHBaseFileCounter = metricGroup.counter("sendHBaseFileCount"); + sendHBaseChunkCounter = metricGroup.counter("sendHBaseChunkCount"); + isAsync = configuration.getBoolean(Configs.SINK_ASYNC); + if (isAsync) { + AsyncHBaseConnection = HBaseConnectionUtil.getInstance(configuration).getAsyncHBaseConnection(); + asyncTable = AsyncHBaseConnection.getTable(TableName.valueOf("default:" + configuration.get(Configs.SINK_HOS_BUCKET))); + asyncIndexTimeTable = AsyncHBaseConnection.getTable(TableName.valueOf("default:index_time_" + configuration.get(Configs.SINK_HOS_BUCKET))); + asyncIndexFilenameTable = AsyncHBaseConnection.getTable(TableName.valueOf("default:index_filename_" + configuration.get(Configs.SINK_HOS_BUCKET))); + } else { + syncHBaseConnection = HBaseConnectionUtil.getInstance(configuration).getSyncHBaseConnection(); + table = syncHBaseConnection.getTable(TableName.valueOf("default:" + configuration.get(Configs.SINK_HOS_BUCKET))); + indexTimeTable = syncHBaseConnection.getTable(TableName.valueOf("default:index_time_" + configuration.get(Configs.SINK_HOS_BUCKET))); + indexFilenameTable = syncHBaseConnection.getTable(TableName.valueOf("default:index_filename_" + configuration.get(Configs.SINK_HOS_BUCKET))); + } + maxBatchSize = configuration.getLong(Configs.SINK_BATCH_SIZE); + maxBatchCount = configuration.getInteger(Configs.SINK_BATCH_COUNT); + dataPutList = new ArrayList<>(); + indexTimePutList = new ArrayList<>(); + indexFilenamePutList = new ArrayList<>(); + chunkSize = 0; + chunkCount = 0; + } + + @Override + public void invoke(FileChunk fileChunk, Context context) { + if (COMBINE_MODE_SEEK.equals(fileChunk.getCombineMode()) && configuration.get(Configs.SINK_BATCH)) { + sendHBaseChunkCounter.inc(); + byte[] data = "".getBytes(); + if (fileChunk.getChunk() != null) { + data = fileChunk.getChunk(); + } + long timestamp = System.currentTimeMillis(); + Map partMessageMap = new HashMap<>(); + partMessageMap.put(APPEND_FILE_PART_MESSAGE_CHUNK_COUNT, fileChunk.getChunkCount() + ""); + partMessageMap.put(APPEND_FILE_PART_MESSAGE_LAST_PART_FLAG, fileChunk.getLastChunkFlag() + ""); + partMessageMap.put(APPEND_FILE_PART_MESSAGE_SIZE, data.length + ""); + Put dataPut = new Put(Bytes.toBytes(PublicUtil.getRowKey(fileChunk.getUuid()) + PublicConstants.FILE_DATA_ROW_SUFFIX)); + dataPut.addColumn(BYTE_FAMILY_DATA, Bytes.toBytes(String.valueOf(fileChunk.getOffset())), data); + dataPut.addColumn(BYTE_FAMILY_META, Bytes.toBytes(String.valueOf(fileChunk.getOffset())), Bytes.toBytes(partMessageMap.toString())); + dataPutList.add(dataPut); + if (fileChunk.getOffset() == 0) { + Put metaPut = new Put(PublicUtil.getRowKey(fileChunk.getUuid()).getBytes()); + String filename = fileChunk.getUuid() + "." + fileChunk.getFileType(); + String randomIndexHead = PublicUtil.getIndexDataHead(filename); + String indexTimeKey = randomIndexHead + "|" + timestamp + "|" + PublicUtil.getRowKey(fileChunk.getUuid()); + String indexFilenameKey = randomIndexHead + "|" + filename; + metaPut.addColumn(BYTE_FAMILY_META, Bytes.toBytes(COLUMN_USER_DEFINED_META_PREFIX + FILE_META_FILE_TYPE), Bytes.toBytes(fileChunk.getFileType())); + metaPut.addColumn(BYTE_FAMILY_META, BYTE_COLUMN_CONTENT_TYPE, Bytes.toBytes("application/octet-stream")); + metaPut.addColumn(BYTE_FAMILY_META, BYTE_COLUMN_FILENAME, Bytes.toBytes(filename)); + metaPut.addColumn(BYTE_FAMILY_META, BYTE_COLUMN_UPLOAD_METHOD, Bytes.toBytes(String.valueOf(5))); + metaPut.addColumn(BYTE_FAMILY_META, BYTE_COLUMN_INDEX_FILENAME_KEY, Bytes.toBytes(indexTimeKey)); + metaPut.addColumn(BYTE_FAMILY_META, BYTE_COLUMN_INDEX_TIME_KEY, Bytes.toBytes(indexFilenameKey)); + metaPut.addColumn(BYTE_FAMILY_META, BYTE_COLUMN_LAST_MODIFIED, Bytes.toBytes(timestamp)); + metaPut.addColumn(BYTE_FAMILY_META, BYTE_COLUMN_APPEND_MODE, Bytes.toBytes(APPEND_MODE_OFFSET)); + metaPut.addColumn(BYTE_FAMILY_META, BYTE_COLUMN_COMBINE_MODE, Bytes.toBytes(fileChunk.getCombineMode())); + dataPutList.add(metaPut); + Put indexTimePut = new Put(Bytes.toBytes(indexTimeKey)); + indexTimePut.addColumn(HBaseColumnConstants.BYTE_FAMILY_META, HBaseColumnConstants.BYTE_COLUMN_FILENAME, Bytes.toBytes(fileChunk.getUuid())); + indexTimePutList.add(indexTimePut); + Put indexFilenamePut = new Put(Bytes.toBytes(indexFilenameKey)); + indexFilenamePut.addColumn(HBaseColumnConstants.BYTE_FAMILY_META, HBaseColumnConstants.BYTE_COLUMN_FILENAME, Bytes.toBytes(fileChunk.getUuid())); + indexFilenamePutList.add(indexFilenamePut); + sendHBaseFileCounter.inc(); + } else { + Put metaPut = new Put(Bytes.toBytes(PublicUtil.getRowKey(fileChunk.getUuid()))); + metaPut.addColumn(HBaseColumnConstants.BYTE_FAMILY_META, HBaseColumnConstants.BYTE_COLUMN_LAST_MODIFIED, Bytes.toBytes(timestamp)); + dataPutList.add(metaPut); + } + chunkCount++; + chunkSize += data.length; + if (chunkSize >= maxBatchSize || chunkCount >= maxBatchCount) { + if (isAsync) { + if (dataPutList.size() > 0) { + List> futures = asyncTable.batch(dataPutList); + sendHBaseCounter.inc(); + CompletableFuture.supplyAsync(() -> { + for (CompletableFuture completableFuture : futures) { + completableFuture.whenCompleteAsync((result, error) -> { + if (error != null) { + LOG.error("put chunk to hbase error. ", error.getMessage()); + sendHBaseErrorCounter.inc(); + } + }); + } + return null; + }); + dataPutList.clear(); + } + if (indexTimePutList.size() > 0) { + asyncIndexTimeTable.batch(indexTimePutList); + sendHBaseCounter.inc(); + indexTimePutList.clear(); + } + if (indexFilenamePutList.size() > 0) { + asyncIndexFilenameTable.batch(indexFilenamePutList); + sendHBaseCounter.inc(); + indexFilenamePutList.clear(); + } + } else { + if (dataPutList.size() > 0) { + try { + sendHBaseCounter.inc(); + table.batch(dataPutList, null); + } catch (IOException | InterruptedException e) { + LOG.error("put chunk to hbase data table error. ", e.getMessage()); + sendHBaseErrorCounter.inc(); + }finally { + dataPutList.clear(); + } + } + if (indexTimePutList.size() > 0) { + try { + sendHBaseCounter.inc(); + indexTimeTable.batch(indexTimePutList, null); + } catch (IOException | InterruptedException e) { + LOG.error("put chunk to hbase index time table error. ", e.getMessage()); + sendHBaseErrorCounter.inc(); + }finally { + indexTimePutList.clear(); + } + } + if (indexFilenamePutList.size() > 0) { + try { + sendHBaseCounter.inc(); + indexFilenameTable.batch(indexFilenamePutList, null); + } catch (IOException | InterruptedException e) { + LOG.error("put chunk to hbase index filename table error. ", e.getMessage()); + sendHBaseErrorCounter.inc(); + }finally { + indexFilenamePutList.clear(); + } + } + } + chunkSize = 0; + chunkCount = 0; + } + } + } + + @Override + public void close() { + IoUtil.close(table); + IoUtil.close(indexTimeTable); + IoUtil.close(indexFilenameTable); + IoUtil.close(syncHBaseConnection); + IoUtil.close(AsyncHBaseConnection); + } + +} diff --git a/src/main/java/com/zdjizhi/sink/HosSink.java b/src/main/java/com/zdjizhi/sink/HosSink.java index f49a43b..502c678 100644 --- a/src/main/java/com/zdjizhi/sink/HosSink.java +++ b/src/main/java/com/zdjizhi/sink/HosSink.java @@ -1,19 +1,61 @@ package com.zdjizhi.sink; +import cn.hutool.core.io.IoUtil; +import cn.hutool.core.util.*; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.config.Configs; import com.zdjizhi.pojo.FileChunk; import com.zdjizhi.utils.HttpClientUtil; import com.zdjizhi.utils.PublicUtil; +import org.apache.commons.lang.CharEncoding; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.concurrent.FutureCallback; +import org.apache.http.conn.ConnectTimeoutException; +import org.apache.http.conn.HttpHostConnectException; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; +import org.apache.http.util.EntityUtils; import java.io.IOException; +import java.net.ConnectException; +import java.util.*; + +import static com.zdjizhi.utils.HttpHeaderConstants.*; +import static com.zdjizhi.utils.PublicConstants.*; public class HosSink extends RichSinkFunction { + private static final Log LOG = LogFactory.get(); private final Configuration configuration; + private transient Counter sendHosCounter; private transient Counter sendHosErrorCounter; + private transient Counter sendHosFileCounter; + private transient Counter sendHosChunkCounter; + private boolean isAsync; + private CloseableHttpClient syncHttpClient; + private CloseableHttpAsyncClient asyncHttpClient; + private int loadBalanceMode; + private volatile String endpoint; + private List ipList; + private List portList; + private String token; + private volatile String bathPutUrl; + private HashMap hosMessage; + private String objectsMeta = ""; + private String objectsOffset = ""; + private List byteList; + private long maxBatchSize; + private long maxBatchCount; + private long chunkSize = 0; + private int chunkCount = 0; public HosSink(Configuration configuration) { this.configuration = configuration; @@ -23,17 +65,182 @@ public class HosSink extends RichSinkFunction { public void open(Configuration parameters) throws Exception { super.open(parameters); MetricGroup metricGroup = getRuntimeContext().getMetricGroup(); + sendHosCounter = metricGroup.counter("sendHosCount"); sendHosErrorCounter = metricGroup.counter("sendHosErrorCount"); + sendHosFileCounter = metricGroup.counter("sendHosFileCount"); + sendHosChunkCounter = metricGroup.counter("sendHosChunkCount"); + loadBalanceMode = configuration.getInteger(Configs.SINK_HOS_LOAD_BALANCE_MODE); + if (loadBalanceMode == 0) { + endpoint = configuration.getString(Configs.SINK_HOS_ENDPOINT); + } else if (loadBalanceMode == 1) { + String[] ipPortArr = configuration.get(Configs.SINK_HOS_ENDPOINT).split(":"); + ipList = Arrays.asList(ipPortArr[0].split(",")); + portList = Arrays.asList(ipPortArr[1].split(",")); + endpoint = ipList.get(RandomUtil.randomInt(ipList.size())) + ":" + portList.get(RandomUtil.randomInt(portList.size())); + } + token = configuration.get(Configs.SINK_HOS_TOKEN); + isAsync = configuration.getBoolean(Configs.SINK_ASYNC); + if (isAsync) { + asyncHttpClient = HttpClientUtil.getInstance(configuration).getAsyncHttpClient(); + asyncHttpClient.start(); + } else { + syncHttpClient = HttpClientUtil.getInstance(configuration).getSyncHttpClient(); + } + bathPutUrl = URLUtil.normalize(endpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + PublicUtil.getUUID()) + "?multiFile"; + maxBatchSize = configuration.getLong(Configs.SINK_BATCH_SIZE); + maxBatchCount = configuration.getInteger(Configs.SINK_BATCH_COUNT); + hosMessage = new HashMap<>(); + objectsMeta = ""; + objectsOffset = ""; + byteList = new ArrayList<>(); } @Override public void invoke(FileChunk fileChunk, Context context) { - PublicUtil.sendToHos(fileChunk, configuration, sendHosErrorCounter); + byte[] data = "".getBytes(); + if (fileChunk.getChunk() != null) { + data = fileChunk.getChunk(); + } + sendHosChunkCounter.inc(); + if (configuration.get(Configs.SINK_BATCH)) { + hosMessage.put(HOS_META_FILE_TYPE, fileChunk.getFileType()); + hosMessage.put(HOS_META_FILENAME, fileChunk.getUuid()); + if (COMBINE_MODE_SEEK.equals(fileChunk.getCombineMode())) { + hosMessage.put(HOS_OFFSET, fileChunk.getOffset() + ""); + hosMessage.put(HOS_PART_LAST_FLAG, fileChunk.getLastChunkFlag() + ""); + if (fileChunk.getOffset() == 0) { + sendHosFileCounter.inc(); + } + } else { + hosMessage.put(HOS_PART_NUMBER, fileChunk.getTimestamp() + ""); + hosMessage.put(HOS_PART_CHUNK_NUMBERS, fileChunk.getChunkNumbers()); + } + hosMessage.put(HOS_PART_CHUNK_COUNT, fileChunk.getChunkCount() + ""); + Map metaMap = fileChunk.getMeta(); + if (metaMap != null && metaMap.size() > 0) { + for (String meta : metaMap.keySet()) { + hosMessage.put(HOS_META_PREFIX + StrUtil.toSymbolCase(meta, CharUtil.DASHED), metaMap.get(meta) + ""); + } + } + objectsMeta += hosMessage.toString() + ";"; + hosMessage.clear(); + objectsOffset += data.length + ";"; + byteList.add(data); + chunkCount++; + chunkSize += data.length; + if (chunkSize >= maxBatchSize || chunkCount >= maxBatchCount) { + HttpPut httpPut = new HttpPut(bathPutUrl); + httpPut.setHeader(TOKEN, token); + httpPut.setHeader(HOS_UPLOAD_TYPE, UPLOAD_TYPE_APPENDV2); + httpPut.setHeader(HOS_COMBINE_MODE, fileChunk.getCombineMode()); + httpPut.setHeader(HOS_OBJECTS_META, objectsMeta); + httpPut.setHeader(HOS_OBJECTS_OFFSET, objectsOffset); + byte[][] bytes = new byte[byteList.size()][]; + byteList.toArray(bytes); + byte[] newData = ArrayUtil.addAll(bytes); + httpPut.setEntity(new ByteArrayEntity(newData)); + byteList.clear(); + executeRequest(httpPut); + objectsMeta = ""; + objectsOffset = ""; + chunkSize = 0; + chunkCount = 0; + } + } else { + String url = URLUtil.normalize(endpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + fileChunk.getUuid()); + HttpPut httpPut = new HttpPut(url); + httpPut.setHeader(TOKEN, configuration.get(Configs.SINK_HOS_TOKEN)); + httpPut.setHeader(HOS_UPLOAD_TYPE, UPLOAD_TYPE_APPENDV2); + httpPut.setHeader(HOS_COMBINE_MODE, fileChunk.getCombineMode()); + String filename = fileChunk.getFileName(); + if (StrUtil.isNotEmpty(filename) && filename.contains(".")) { + httpPut.setHeader(HOS_META_FILENAME, filename); + } else if (StrUtil.isNotEmpty(filename) && !filename.contains(".")) { + filename = filename + "." + fileChunk.getFileType(); + httpPut.setHeader(HOS_META_FILENAME, filename); + } else if (StrUtil.isEmpty(filename) && StrUtil.isNotEmpty(fileChunk.getFileType())) { + httpPut.setHeader(HOS_META_FILE_TYPE, fileChunk.getFileType()); + } + if (COMBINE_MODE_SEEK.equals(fileChunk.getCombineMode())) { + httpPut.setHeader(HOS_OFFSET, fileChunk.getOffset() + ""); + httpPut.setHeader(HOS_PART_LAST_FLAG, fileChunk.getLastChunkFlag() + ""); + if (fileChunk.getOffset() == 0) { + sendHosFileCounter.inc(); + } + } else { + httpPut.setHeader(HOS_PART_NUMBER, fileChunk.getTimestamp() + ""); + httpPut.setHeader(HOS_PART_CHUNK_NUMBERS, fileChunk.getChunkNumbers()); + } + httpPut.setHeader(HOS_PART_CHUNK_COUNT, fileChunk.getChunkCount() + ""); + Map metaMap = fileChunk.getMeta(); + if (metaMap != null && metaMap.size() > 0) { + for (String meta : metaMap.keySet()) { + httpPut.setHeader(HOS_META_PREFIX + StrUtil.toSymbolCase(meta, CharUtil.DASHED), metaMap.get(meta) + ""); + } + } + httpPut.setEntity(new ByteArrayEntity(fileChunk.getChunk())); + executeRequest(httpPut); + } } @Override - public void close() throws IOException { - HttpClientUtil.getInstance(null).close(); + public void close() { + IoUtil.close(syncHttpClient); + IoUtil.close(asyncHttpClient); + } + + private void executeRequest(HttpPut httpPut) { + sendHosCounter.inc(); + if (isAsync) { + asyncHttpClient.execute(httpPut, new FutureCallback() { + @Override + public void completed(HttpResponse httpResponse) { + try { + if (httpResponse.getStatusLine().getStatusCode() != 200) { + String responseEntity = EntityUtils.toString(httpResponse.getEntity(), CharEncoding.UTF_8); + LOG.error("put part to hos error. code: " + httpResponse.getStatusLine().getStatusCode() + ". message: " + responseEntity); + sendHosErrorCounter.inc(); + } + } catch (IOException e) { + LOG.error("put part to hos error.", e); + sendHosErrorCounter.inc(); + } + } + + @Override + public void failed(Exception ex) { + LOG.error("put part to hos error.", ex); + sendHosErrorCounter.inc(); + if (loadBalanceMode == 1 && ex instanceof ConnectException) { + endpoint = ipList.get(RandomUtil.randomInt(ipList.size())) + ":" + portList.get(RandomUtil.randomInt(portList.size())); + bathPutUrl = URLUtil.normalize(endpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + PublicUtil.getUUID()) + "?multiFile"; + } + } + + @Override + public void cancelled() { + + } + }); + } else { + CloseableHttpResponse response = null; + try { + response = syncHttpClient.execute(httpPut); + if (response.getStatusLine().getStatusCode() != 200) { + String responseEntity = EntityUtils.toString(response.getEntity(), CharEncoding.UTF_8); + LOG.error("put part to hos error. code: " + response.getStatusLine().getStatusCode() + ". message: " + responseEntity); + sendHosErrorCounter.inc(); + } + } catch (IOException e) { + LOG.error("put part to hos error.", e); + sendHosErrorCounter.inc(); + if (loadBalanceMode == 1 && (e instanceof HttpHostConnectException || e instanceof ConnectTimeoutException)) { + endpoint = ipList.get(RandomUtil.randomInt(ipList.size())) + ":" + portList.get(RandomUtil.randomInt(portList.size())); + } + } finally { + IoUtil.close(response); + } + } } } diff --git a/src/main/java/com/zdjizhi/utils/HBaseColumnConstants.java b/src/main/java/com/zdjizhi/utils/HBaseColumnConstants.java new file mode 100644 index 0000000..e4f3f51 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/HBaseColumnConstants.java @@ -0,0 +1,50 @@ +package com.zdjizhi.utils; + +import org.apache.hadoop.hbase.util.Bytes; + +public interface HBaseColumnConstants { + + String FAMILY_DATA = "data"; + String FAMILY_META = "meta"; + String COLUMN_FILENAME = "filename"; + String COLUMN_CONTENT_TYPE = "content_type"; + String COLUMN_CONTENT_LENGTH = "content_length"; + String COLUMN_LAST_MODIFIED = "last_modified"; + String COLUMN_INDEX_FILENAME_KEY = "index_filename_key"; + String COLUMN_INDEX_TIME_KEY = "index_time_key"; + String COLUMN_UPLOAD_METHOD = "upload_method"; + String COLUMN_STORAGE_BACKEND = "storage_backend"; + String COLUMN_FILE_PATH = "file_path"; + String COLUMN_APPEND_MODE = "append_mode"; + String COLUMN_APPEND_PART_SIZE = "append_part_size"; + String COLUMN_APPEND_PART_CHUNK_COUNT = "append_part_chunk_count"; + String COLUMN_COMBINE_MODE = "combine_mode"; + String COLUMN_USER_DEFINED_META_PREFIX = "user_defined_meta_"; + String BUCKET_COLUMN_USER = "user"; + String BUCKET_COLUMN_PRIVILEGE = "privilege"; + String BUCKET_COLUMN_TTL = "ttl"; + String BUCKET_COLUMN_WAL = "wal"; + String BUCKET_COLUMN_LOCATION = "location"; + + byte[] BYTE_FAMILY_DATA = Bytes.toBytes(FAMILY_DATA); + byte[] BYTE_FAMILY_META = Bytes.toBytes(FAMILY_META); + byte[] BYTE_COLUMN_FILENAME = Bytes.toBytes(COLUMN_FILENAME); + byte[] BYTE_COLUMN_CONTENT_TYPE = Bytes.toBytes(COLUMN_CONTENT_TYPE); + byte[] BYTE_COLUMN_CONTENT_LENGTH = Bytes.toBytes(COLUMN_CONTENT_LENGTH); + byte[] BYTE_COLUMN_LAST_MODIFIED = Bytes.toBytes(COLUMN_LAST_MODIFIED); + byte[] BYTE_COLUMN_INDEX_FILENAME_KEY = Bytes.toBytes(COLUMN_INDEX_FILENAME_KEY); + byte[] BYTE_COLUMN_INDEX_TIME_KEY = Bytes.toBytes(COLUMN_INDEX_TIME_KEY); + byte[] BYTE_COLUMN_UPLOAD_METHOD = Bytes.toBytes(COLUMN_UPLOAD_METHOD); + byte[] BYTE_COLUMN_STORAGE_BACKEND = Bytes.toBytes(COLUMN_STORAGE_BACKEND); + byte[] BYTE_COLUMN_FILE_PATH = Bytes.toBytes(COLUMN_FILE_PATH); + byte[] BYTE_COLUMN_APPEND_MODE = Bytes.toBytes(COLUMN_APPEND_MODE); + byte[] BYTE_COLUMN_APPEND_PART_SIZE = Bytes.toBytes(COLUMN_APPEND_PART_SIZE); + byte[] BYTE_COLUMN_APPEND_PART_CHUNK_COUNT = Bytes.toBytes(COLUMN_APPEND_PART_CHUNK_COUNT); + byte[] BYTE_COLUMN_COMBINE_MODE = Bytes.toBytes(COLUMN_COMBINE_MODE); + byte[] BYTE_BUCKET_COLUMN_USER = Bytes.toBytes(BUCKET_COLUMN_USER); + byte[] BYTE_BUCKET_COLUMN_PRIVILEGE = Bytes.toBytes(BUCKET_COLUMN_PRIVILEGE); + byte[] BYTE_BUCKET_COLUMN_TTL = Bytes.toBytes(BUCKET_COLUMN_TTL); + byte[] BYTE_BUCKET_COLUMN_WAL = Bytes.toBytes(BUCKET_COLUMN_WAL); + byte[] BYTE_BUCKET_COLUMN_LOCATION = Bytes.toBytes(BUCKET_COLUMN_LOCATION); + +} diff --git a/src/main/java/com/zdjizhi/utils/HBaseConnectionUtil.java b/src/main/java/com/zdjizhi/utils/HBaseConnectionUtil.java new file mode 100644 index 0000000..be82770 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/HBaseConnectionUtil.java @@ -0,0 +1,59 @@ +package com.zdjizhi.utils; + +import com.zdjizhi.config.Configs; +import org.apache.flink.configuration.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.AsyncConnection; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionConfiguration; +import org.apache.hadoop.hbase.client.ConnectionFactory; + +import java.io.IOException; +import java.util.concurrent.ExecutionException; + +public class HBaseConnectionUtil { + + private static HBaseConnectionUtil hbaseConnectionUtil = null; + private final org.apache.hadoop.conf.Configuration hbaseConfiguration; + + private HBaseConnectionUtil(Configuration configuration) { + hbaseConfiguration = HBaseConfiguration.create(); + hbaseConfiguration.set(HConstants.ZOOKEEPER_QUORUM, configuration.getString(Configs.SINK_HBASE_ZOOKEEPER)); + hbaseConfiguration.set(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT, "2181"); + hbaseConfiguration.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase"); + hbaseConfiguration.set(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, configuration.get(Configs.SINK_HBASE_RETRIES_NUMBER) + ""); + hbaseConfiguration.set(HConstants.HBASE_RPC_TIMEOUT_KEY, configuration.get(Configs.SINK_HBASE_RPC_TIMEOUT) + ""); + hbaseConfiguration.set(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, "1073741800"); + hbaseConfiguration.set(ConnectionConfiguration.WRITE_BUFFER_SIZE_KEY, configuration.get(Configs.SINK_HBASE_CLIENT_WRITE_BUFFER) + ""); + hbaseConfiguration.set(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, configuration.get(Configs.SINK_HBASE_CLIENT_IPC_POOL_SIZE) + ""); + + } + + public static synchronized HBaseConnectionUtil getInstance(Configuration configuration) { + if (null == hbaseConnectionUtil) { + hbaseConnectionUtil = new HBaseConnectionUtil(configuration); + } + return hbaseConnectionUtil; + } + + public Connection getSyncHBaseConnection() { + Connection syncHBaseConnection; + try { + syncHBaseConnection = ConnectionFactory.createConnection(hbaseConfiguration); + } catch (IOException e) { + throw new RuntimeException(e.getMessage()); + } + return syncHBaseConnection; + } + + public AsyncConnection getAsyncHBaseConnection() { + AsyncConnection asyncHBaseConnection; + try { + asyncHBaseConnection = ConnectionFactory.createAsyncConnection(hbaseConfiguration).get(); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e.getMessage()); + } + return asyncHBaseConnection; + } +} diff --git a/src/main/java/com/zdjizhi/utils/HttpClientUtil.java b/src/main/java/com/zdjizhi/utils/HttpClientUtil.java index 37f8975..f2e5d33 100644 --- a/src/main/java/com/zdjizhi/utils/HttpClientUtil.java +++ b/src/main/java/com/zdjizhi/utils/HttpClientUtil.java @@ -2,14 +2,11 @@ package com.zdjizhi.utils; import com.zdjizhi.config.Configs; import org.apache.flink.configuration.Configuration; -import org.apache.http.Header; import org.apache.http.HttpEntityEnclosingRequest; import org.apache.http.HttpRequest; import org.apache.http.NoHttpResponseException; import org.apache.http.client.HttpRequestRetryHandler; import org.apache.http.client.config.RequestConfig; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpPut; import org.apache.http.client.protocol.HttpClientContext; import org.apache.http.config.Registry; import org.apache.http.config.RegistryBuilder; @@ -17,13 +14,18 @@ import org.apache.http.conn.socket.ConnectionSocketFactory; import org.apache.http.conn.socket.PlainConnectionSocketFactory; import org.apache.http.conn.ssl.NoopHostnameVerifier; import org.apache.http.conn.ssl.SSLConnectionSocketFactory; -import org.apache.http.entity.ByteArrayEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; +import org.apache.http.impl.nio.client.HttpAsyncClients; +import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager; +import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor; +import org.apache.http.impl.nio.reactor.IOReactorConfig; +import org.apache.http.nio.reactor.ConnectingIOReactor; +import org.apache.http.nio.reactor.IOReactorException; import javax.net.ssl.*; -import java.io.IOException; import java.io.InterruptedIOException; import java.net.ConnectException; import java.net.UnknownHostException; @@ -34,19 +36,10 @@ import java.security.cert.X509Certificate; public class HttpClientUtil { private static HttpClientUtil httpClientUtil; - private final CloseableHttpClient closeableHttpClient; private final Configuration configuration; private HttpClientUtil(Configuration configuration) { this.configuration = configuration; - closeableHttpClient = HttpClients.custom() - // 把请求相关的超时信息设置到连接客户端 - .setDefaultRequestConfig(getRequestConfig()) - // 把请求重试设置到连接客户端 - .setRetryHandler(getRetryHandler()) - // 配置连接池管理对象 - .setConnectionManager(getSslClientManager()) - .build(); } private RequestConfig getRequestConfig() { @@ -87,7 +80,7 @@ public class HttpClientUtil { }; } - private PoolingHttpClientConnectionManager getSslClientManager() { + private PoolingHttpClientConnectionManager getSyncSslClientManager() { PoolingHttpClientConnectionManager connManager; try { X509TrustManager trustManager = new X509TrustManager() { @@ -122,6 +115,45 @@ public class HttpClientUtil { return connManager; } + private PoolingNHttpClientConnectionManager getAsyncSslClientManager() { + PoolingNHttpClientConnectionManager connManager; + try { +// X509TrustManager trustManager = new X509TrustManager() { +// @Override +// public X509Certificate[] getAcceptedIssuers() { +// return null; +// } +// +// @Override +// public void checkClientTrusted(X509Certificate[] xcs, String str) { +// } +// +// @Override +// public void checkServerTrusted(X509Certificate[] xcs, String str) { +// } +// }; +// SSLContext sslContext = SSLContext.getInstance(SSLConnectionSocketFactory.TLS); +// sslContext.init(null, new TrustManager[]{trustManager}, null); +// SSLIOSessionStrategy sslioSessionStrategy = new SSLIOSessionStrategy(sslContext, SSLIOSessionStrategy.ALLOW_ALL_HOSTNAME_VERIFIER); +// SSLIOSessionStrategy defaultStrategy = SSLIOSessionStrategy.getDefaultStrategy(); +// Registry schemeIOSessionStrategyRegistry = RegistryBuilder.create() +// .register("http", NoopIOSessionStrategy.INSTANCE) +// .register("https", sslioSessionStrategy).build(); + + IOReactorConfig ioReactorConfig = IOReactorConfig.custom() + .setIoThreadCount(Runtime.getRuntime().availableProcessors()) + .setSoKeepAlive(true) + .build(); + ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig); + connManager = new PoolingNHttpClientConnectionManager(ioReactor); + connManager.setMaxTotal(configuration.get(Configs.SINK_HOS_HTTP_MAX_TOTAL)); + connManager.setDefaultMaxPerRoute(configuration.get(Configs.SINK_HOS_HTTP_MAX_PER_ROUTE)); + } catch (IOReactorException e) { + throw new RuntimeException(e.getMessage()); + } + return connManager; + } + public static synchronized HttpClientUtil getInstance(Configuration configuration) { if (null == httpClientUtil) { httpClientUtil = new HttpClientUtil(configuration); @@ -129,20 +161,19 @@ public class HttpClientUtil { return httpClientUtil; } - public void close() throws IOException { - closeableHttpClient.close(); + public CloseableHttpClient getSyncHttpClient() { + return HttpClients.custom() + .setDefaultRequestConfig(getRequestConfig()) + .setRetryHandler(getRetryHandler()) + .setConnectionManager(getSyncSslClientManager()) + .build(); } - public CloseableHttpResponse httpPut(String url, byte[] requestBody, Header... headers) throws IOException { - HttpPut put = new HttpPut(url); - if (StringUtil.isNotEmpty(headers)) { - for (Header header : headers) { - if (StringUtil.isNotEmpty(header)) { - put.addHeader(header); - } - } - } - put.setEntity(new ByteArrayEntity(requestBody)); - return closeableHttpClient.execute(put); + public CloseableHttpAsyncClient getAsyncHttpClient() { + return HttpAsyncClients.custom() + .setDefaultRequestConfig(getRequestConfig()) + .setConnectionManager(getAsyncSslClientManager()) + .build(); } + } diff --git a/src/main/java/com/zdjizhi/utils/HttpHeaderConstants.java b/src/main/java/com/zdjizhi/utils/HttpHeaderConstants.java new file mode 100644 index 0000000..1ef3d93 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/HttpHeaderConstants.java @@ -0,0 +1,54 @@ +package com.zdjizhi.utils; + +public interface HttpHeaderConstants { + /* + * Standard HTTP Headers + */ + String CACHE_CONTROL = "Cache-Control"; + String CONTENT_DISPOSITION = "Content-Disposition"; + String CONTENT_ENCODING = "Content-Encoding"; + String CONTENT_LENGTH = "Content-Length"; + String CONTENT_RANGE = "Content-Range"; + String CONTENT_MD5 = "Content-MD5"; + String CONTENT_TYPE = "Content-Type"; + String CONTENT_LANGUAGE = "Content-Language"; + String DATE = "Date"; + String LAST_MODIFIED = "Last-Modified"; + String SERVER = "Server"; + String CONNECTION = "Connection"; + String ETAG = "ETag"; + + /* + * Hos HTTP Headers + */ + String AMZ_ACL = "x-amz-acl"; + String AMZ_CONTENT_SHA_256 = "x-amz-content-sha256"; + String AMZ_DATE = "x-amz-date"; + String HOS_WAL = "x-hos-wal"; + String HOS_COMPRESSION = "x-hos-compression"; + String HOS_SPLITS = "x-hos-splits"; + String HOS_UPLOAD_TYPE = "x-hos-upload-type"; + String HOS_START_TIME = "x-hos-start-time"; + String HOS_END_TIME = "x-hos-end-time"; + String HOS_QUICK = "x-hos-quick"; + String HOS_OBJECT_INFO = "x-hos-object-info"; + String TOKEN = "token"; + String HOS_POSITION = "x-hos-position"; + String HOS_NEXT_APPEND_POSITION = "x-hos-next-append-position"; + String HOS_COMBINE_MODE = "x-hos-combine-mode"; + String HOS_PART_NUMBER = "x-hos-part-number"; + String HOS_OFFSET = "x-hos-offset"; + String HOS_PART_LAST_FLAG = "x-hos-part-last-flag"; + String HOS_PART_CHUNK_COUNT = "x-hos-part-chunk-count"; + String HOS_PART_CHUNK_NUMBERS = "x-hos-part-chunk-numbers"; + String HOS_META_FILE_TYPE = "x-hos-meta-file-type"; + String HOS_META_FILENAME = "x-hos-meta-filename"; + String FILE_SIZE = "File-Size"; + String LOCATION = "Location"; + String HOS_OBJECT_TYPE = "x-hos-object-type"; + String HOS_APPEND_INFO = "x-hos-append-info"; + String HOS_META_PREFIX = "x-hos-meta-"; + String HOS_METADATA_DIRECTIVE = "x-hos-metadata-directive"; + String HOS_OBJECTS_META = "x-hos-objects-meta"; + String HOS_OBJECTS_OFFSET = "x-hos-objects-offset"; +} diff --git a/src/main/java/com/zdjizhi/utils/PublicConstants.java b/src/main/java/com/zdjizhi/utils/PublicConstants.java new file mode 100644 index 0000000..39586b2 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/PublicConstants.java @@ -0,0 +1,33 @@ +package com.zdjizhi.utils; + +public interface PublicConstants { + String STORAGE_BACKEND_HBASE = "hbase"; + String STORAGE_BACKEND_FILESYSTEM = "filesystem"; + String APPEND_MODE_OFFSET = "offset"; + String APPEND_MODE_PART_NUMBER = "partNumber"; + String COMBINE_MODE_APPEND = "append"; + String COMBINE_MODE_SEEK = "seek"; + String MULTIPART_FILE_PART_MESSAGE_ETAG = "etag"; + String MULTIPART_FILE_PART_MESSAGE_SIZE = "partSize"; + String APPEND_FILE_PART_MESSAGE_SIZE = "partSize"; + String APPEND_FILE_PART_MESSAGE_CHUNK_COUNT = "chunkCount"; + String APPEND_FILE_PART_MESSAGE_LAST_PART_FLAG = "lastPartFlag"; + String APPEND_FILE_PART_MESSAGE_CHUNK_NUMBERS = "chunkNumbers"; + String BUCKET_META_PRIVILEGE_PRIVATE = "private"; + String BUCKET_META_PRIVILEGE_PUBLIC = "public-read-write"; + String DEFAULT_BUCKET_META_WAL = "close"; + String UPLOAD_TYPE_UPDATE = "put"; + String UPLOAD_TYPE_APPENDV2 = "appendV2"; + String UPLOAD_TYPE_APPEND = "append"; + String DEFAULT_GET_FILE_LIST_QUICK = "false"; + String FILE_DATA_ROW_SUFFIX = "0"; + String FILE_DATA_COLUMN = "0"; + String FILE_TYPE_PACPNG = "pcapng"; + String FILE_TYPE_PACP = "pcap"; + String FILE_META_FILE_TYPE = "file_type"; + String FILE_META_FILENAME = "filename"; + String OBJECT_TYPE_NORMAL = "Normal"; + String OBJECT_TYPE_APPENDABLE = "Appendable"; + String OBJECT_TYPE_MULTIPART = "Multipart"; + String DEFAULT_METADATA_DIRECTIVE = "REPLACE_NEW"; +} diff --git a/src/main/java/com/zdjizhi/utils/PublicUtil.java b/src/main/java/com/zdjizhi/utils/PublicUtil.java index ac11fbb..6074954 100644 --- a/src/main/java/com/zdjizhi/utils/PublicUtil.java +++ b/src/main/java/com/zdjizhi/utils/PublicUtil.java @@ -1,25 +1,19 @@ package com.zdjizhi.utils; -import cn.hutool.core.io.IoUtil; import cn.hutool.core.util.ArrayUtil; -import cn.hutool.core.util.CharUtil; import cn.hutool.core.util.StrUtil; +import cn.hutool.crypto.digest.DigestUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; -import com.zdjizhi.config.Configs; import com.zdjizhi.pojo.FileChunk; -import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Counter; -import org.apache.http.Header; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.message.BasicHeader; -import org.apache.http.util.EntityUtils; -import java.io.IOException; import java.util.*; import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import static com.zdjizhi.utils.PublicConstants.COMBINE_MODE_SEEK; + public class PublicUtil { private static final Log LOG = LogFactory.get(); @@ -27,8 +21,9 @@ public class PublicUtil { List combinedFileChunkList = new ArrayList<>(); try { List originalFileChunkList = StreamSupport.stream(input.spliterator(), false).collect(Collectors.toList()); + System.out.println(originalFileChunkList); List waitingToCombineChunkList = new ArrayList<>(); - if ("seek".equals(originalFileChunkList.get(0).getCombineMode())) { + if (COMBINE_MODE_SEEK.equals(originalFileChunkList.get(0).getCombineMode())) { seekChunkCounter.inc(); // 按照offset排序 originalFileChunkList.sort(Comparator.comparingLong(FileChunk::getOffset)); @@ -118,7 +113,7 @@ public class PublicUtil { byte[][] bytes = new byte[byteList.size()][]; byteList.toArray(bytes); byte[] newData = ArrayUtil.addAll(bytes); - if ("seek".equals(combineMode)) { + if (COMBINE_MODE_SEEK.equals(combineMode)) { fileChunk.setOffset(offset); fileChunk.setLastChunkFlag(lastChunkFlag); } else { @@ -137,55 +132,17 @@ public class PublicUtil { return fileChunk; } - public static void sendToHos(FileChunk fileChunk, Configuration configuration,Counter sendHosErrorCounter){ - CloseableHttpResponse response = null; - try { - String url = configuration.get(Configs.SINK_HOS_ENDPOINT) + "/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + fileChunk.getUuid(); - byte[] data; - if (fileChunk.getChunk() != null) { - data = fileChunk.getChunk(); - } else { - data = "".getBytes(); - } - List
headers = new ArrayList<>(); - headers.add(new BasicHeader("token", configuration.get(Configs.SINK_HOS_TOKEN))); - headers.add(new BasicHeader("x-hos-upload-type", "appendV2")); - headers.add(new BasicHeader("x-hos-combine-mode", fileChunk.getCombineMode())); - String filename = fileChunk.getFileName(); - if (StrUtil.isNotEmpty(filename) && filename.contains(".")) { - headers.add(new BasicHeader("x-hos-meta-filename", filename)); - } else if (StrUtil.isNotEmpty(filename) && !filename.contains(".")) { - filename = filename + "." + fileChunk.getFileType(); - headers.add(new BasicHeader("x-hos-meta-filename", filename)); - } else if (StrUtil.isEmpty(filename) && StrUtil.isNotEmpty(fileChunk.getFileType())) { - headers.add(new BasicHeader("x-hos-meta-file-type", fileChunk.getFileType())); - } - if ("seek".equals(fileChunk.getCombineMode())) { - headers.add(new BasicHeader("x-hos-offset", fileChunk.getOffset() + "")); - headers.add(new BasicHeader("x-hos-part-last-flag", fileChunk.getLastChunkFlag() + "")); - } else { - headers.add(new BasicHeader("x-hos-part-number", fileChunk.getTimestamp() + "")); - headers.add(new BasicHeader("x-hos-part-chunk-numbers", fileChunk.getChunkNumbers())); - } - headers.add(new BasicHeader("x-hos-part-chunk-count", fileChunk.getChunkCount() + "")); - Map metaMap = fileChunk.getMeta(); - if (metaMap != null && metaMap.size() > 0) { - for (String meta : metaMap.keySet()) { - headers.add(new BasicHeader("x-hos-meta-" + StrUtil.toSymbolCase(meta, CharUtil.DASHED), metaMap.get(meta) + "")); - } - } - response = HttpClientUtil.getInstance(configuration).httpPut(url, data, headers.toArray(new Header[0])); - if (response.getStatusLine().getStatusCode() != 200) { - String responseEntity = EntityUtils.toString(response.getEntity(), "UTF-8"); - LOG.error("put part to hos error. code: " + response.getStatusLine().getStatusCode() + ". message: " + responseEntity); - sendHosErrorCounter.inc(); - } - } catch (IOException e) { - LOG.error("put part to hos error.", e); - sendHosErrorCounter.inc(); - } finally { - IoUtil.close(response); - } + public static String getUUID() { + return UUID.randomUUID().toString().replace("-", "").toLowerCase(); } + public static String getRowKey(String filename) { + String md5str = DigestUtil.md5Hex(filename); + md5str = md5str.substring(8, 24); + return md5str; + } + + public static String getIndexDataHead(String filename) { + return getRowKey(filename).substring(0, 1); + } } diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties index e8e2d19..b71bda5 100644 --- a/src/main/resources/common.properties +++ b/src/main/resources/common.properties @@ -1,8 +1,7 @@ flink.job.name=agg_traffic_file_chunk_combine #source -source.kafka.parallelism=1 #9092Ϊ֤ 9095Ϊssl 9094Ϊsasl -source.kafka.broker=192.168.44.12:9092 +source.kafka.broker=192.168.40.151:9092,192.168.40.152:9092,192.168.40.203:9092 source.kafka.group.id=test1 source.kafka.topic=TRAFFIC-FILE-STREAM-RECORD #earliestͷʼ latest @@ -19,21 +18,38 @@ source.kafka.user=admin source.kafka.pin=galaxy2019 #SSLҪ source.kafka.tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\ -parse.message.pack.parallelism=1 # -combiner.window.parallelism=3 +combiner.window.parallelism=2 combiner.window.time=10 #೤ʱδд򴥷 combiner.window.idle.time=5 -combiner.window.key.max.chunk=100000 +file.max.chunk.count=100000 +file.max.size=1073741824 +#evalʽֶι +#filter.expression=FileChunk.fileType == "txt" || FileChunk.fileType == "eml" +#sink +sink.parallelism=2 +sink.type=hos +sink.async=false +sink.batch=true +sink.batch.count=100 +sink.batch.size=102400 #hos sink -sink.hos.parallelism=3 -sink.hos.endpoint=http://192.168.44.12:9098/hos +#0nginx1ѯhosĬ0 +sink.hos.load.balance.mode=1 +#nginx򵥸hosΪip:portʶhosΪip1,ip2:port1,port2 +sink.hos.endpoint=192.168.40.151,192.168.40.152,192.168.40.203:8186 sink.hos.bucket=traffic_file_bucket sink.hos.token=c21f969b5f03d33d43e04f8f136e7682 sink.hos.http.error.retry=3 -sink.hos.http.max.total=2000 -sink.hos.http.max.per.route=1000 -sink.hos.http.connect.timeout=10000 -sink.hos.http.request.timeout=10000 -sink.hos.http.socket.timeout=60000 \ No newline at end of file +sink.hos.http.max.total=10 +sink.hos.http.max.per.route=10 +sink.hos.http.connect.timeout=1000 +sink.hos.http.request.timeout=5000 +sink.hos.http.socket.timeout=60000 +#hbase sink +sink.hbase.zookeeper=192.168.44.12 +sink.hbase.retries.number=10 +sink.hbase.rpc.timeout=600000 +sink.hbase.client.write.buffer=10971520 +sink.hbase.client.ipc.pool.size=3 \ No newline at end of file