优化配置

This commit is contained in:
houjinchuan
2024-07-08 10:07:07 +08:00
parent caf3c7ff84
commit 942acad964
14 changed files with 240 additions and 378 deletions

View File

@@ -6,31 +6,22 @@ import com.zdjizhi.function.*;
import com.zdjizhi.function.map.ParseMessagePackMapFunction;
import com.zdjizhi.function.map.ParseProxyFileMetaFlatMapFunction;
import com.zdjizhi.function.map.ParseSessionFileMetaFlatMapFunction;
import com.zdjizhi.function.map.SideOutputMapFunction;
import com.zdjizhi.kafka.FileMetaKafkaConsumer;
import com.zdjizhi.pojo.*;
import com.zdjizhi.sink.*;
import com.zdjizhi.kafka.KafkaConsumer;
import com.zdjizhi.trigger.IdleTimeTrigger;
import com.zdjizhi.trigger.LastChunkOrNoDataInTimeTrigger;
import com.zdjizhi.trigger.LastChunkTrigger;
import com.zdjizhi.trigger.MultipleTrigger;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.*;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.OutputTag;
import java.time.Duration;
import java.util.*;
public class FileChunkCombiner {
@@ -41,68 +32,33 @@ public class FileChunkCombiner {
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.getConfig().setGlobalJobParameters(configuration);
SingleOutputStreamOperator<FileChunk> windowStream;
OutputTag<FileChunk> delayedChunkOutputTag = new OutputTag<FileChunk>("delayed-chunk") {
};
if (configuration.getInteger(Configs.COMBINER_WINDOW_TYPE) == 0) {
WatermarkStrategy<FileChunk> watermarkStrategy = WatermarkStrategy
.<FileChunk>forBoundedOutOfOrderness(Duration.ofSeconds(configuration.get(Configs.COMBINER_WINDOW_ALLOWED_LATENESS)))
.withTimestampAssigner((FileChunk, timestamp) -> FileChunk.getTimestamp() / 1000);
SingleOutputStreamOperator<FileChunk> parseMessagePackStream = environment
.addSource(KafkaConsumer.byteArrayConsumer(configuration))
.name(configuration.get(Configs.KAFKA_TOPIC))
.map(new ParseMessagePackMapFunction())
.name("Map: Parse Message Pack")
.filter(new FileChunkFilterFunction(configuration.getString(Configs.MAP_FILTER_EXPRESSION), "map_parse_message_pack"))
.name("Filter: Map");
SingleOutputStreamOperator<FileChunk> parseMessagePackStream = environment
.addSource(KafkaConsumer.byteArrayConsumer(configuration))
.name(configuration.get(Configs.KAFKA_TOPIC))
.map(new ParseMessagePackMapFunction())
.name("Map: Parse Message Pack")
.filter(new FileChunkFilterFunction(configuration.getString(Configs.MAP_FILTER_EXPRESSION), "map_parse_message_pack"))
.name("Filter: Map")
.assignTimestampsAndWatermarks(watermarkStrategy);
List<Trigger<Object, TimeWindow>> triggers = new ArrayList<>();
triggers.add(EventTimeTrigger.create());
if (configuration.get(Configs.COMBINER_WINDOW_ENABLE_LAST_CHUNK_TRIGGER)) {
triggers.add(LastChunkOrNoDataInTimeTrigger.of(configuration.get(Configs.COMBINER_WINDOW_IDLE_TIME) * 1000));
} else {
triggers.add(IdleTimeTrigger.of(configuration.get(Configs.COMBINER_WINDOW_IDLE_TIME) * 1000));
}
Trigger<Object, TimeWindow> trigger = MultipleTrigger.of(triggers);
windowStream = parseMessagePackStream
.keyBy(new FileChunkKeySelector(), BasicTypeInfo.STRING_TYPE_INFO)
.window(TumblingEventTimeWindows.of(Time.seconds(configuration.get(Configs.COMBINER_WINDOW_TIME))))
.trigger(trigger)
.sideOutputLateData(delayedChunkOutputTag)
.allowedLateness(Time.seconds(configuration.get(Configs.COMBINER_WINDOW_ALLOWED_LATENESS)))
.process(new CombineChunkProcessWindowFunction(configuration.get(Configs.FILE_MAX_CHUNK_COUNT)))
.name("Window: Combine Chunk")
.setParallelism(configuration.get(Configs.COMBINER_WINDOW_PARALLELISM));
} else {
SingleOutputStreamOperator<FileChunk> parseMessagePackStream = environment
.addSource(KafkaConsumer.byteArrayConsumer(configuration))
.name(configuration.get(Configs.KAFKA_TOPIC))
.map(new ParseMessagePackMapFunction())
.name("Map: Parse Message Pack")
.filter(new FileChunkFilterFunction(configuration.getString(Configs.MAP_FILTER_EXPRESSION), "map_parse_message_pack"))
.name("Filter: Map");
List<Trigger<Object, TimeWindow>> triggers = new ArrayList<>();
triggers.add(ProcessingTimeTrigger.create());
if (configuration.get(Configs.COMBINER_WINDOW_ENABLE_LAST_CHUNK_TRIGGER)) {
triggers.add(LastChunkTrigger.create());
}
Trigger<Object, TimeWindow> trigger = MultipleTrigger.of(triggers);
windowStream = parseMessagePackStream
.keyBy(new FileChunkKeySelector(), BasicTypeInfo.STRING_TYPE_INFO)
.window(TumblingProcessingTimeWindows.of(Time.seconds(configuration.get(Configs.COMBINER_WINDOW_TIME))))
.trigger(trigger)
.process(new CombineChunkProcessWindowFunction(configuration.get(Configs.FILE_MAX_CHUNK_COUNT)))
.name("Window: Combine Chunk")
.setParallelism(configuration.get(Configs.COMBINER_WINDOW_PARALLELISM));
List<Trigger<Object, TimeWindow>> triggers = new ArrayList<>();
triggers.add(ProcessingTimeTrigger.create());
if (configuration.get(Configs.COMBINER_WINDOW_ENABLE_LAST_CHUNK_TRIGGER)) {
triggers.add(LastChunkTrigger.create());
}
Trigger<Object, TimeWindow> trigger = MultipleTrigger.of(triggers);
SingleOutputStreamOperator<FileChunk> windowStream = parseMessagePackStream
.keyBy(new FileChunkKeySelector(), BasicTypeInfo.STRING_TYPE_INFO)
.window(TumblingProcessingTimeWindows.of(Time.seconds(configuration.get(Configs.COMBINER_WINDOW_SIZE))))
.trigger(trigger)
.process(new CombineChunkProcessWindowFunction())
.name("Window: Combine Chunk")
.setParallelism(configuration.get(Configs.COMBINER_WINDOW_PARALLELISM));
SingleOutputStreamOperator<FileChunk> fileMetaSessionSingleOutputStreamOperator;
SingleOutputStreamOperator<FileChunk> fileMetaProxySingleOutputStreamOperator;
for (String sinkType : configuration.get(Configs.SINK_TYPE).split(",")) {
switch (sinkType) {
case "hos":
DataStream<FileChunk> sideOutput = windowStream.getSideOutput(delayedChunkOutputTag);
if (StrUtil.isNotEmpty(configuration.getString(Configs.SINK_FILTER_EXPRESSION))) {
windowStream
.filter(new FileChunkFilterFunction(configuration.getString(Configs.SINK_FILTER_EXPRESSION), "sink_hos"))
@@ -111,24 +67,14 @@ public class FileChunkCombiner {
.addSink(new HosSink(configuration))
.name("Hos")
.setParallelism(configuration.get(Configs.SINK_PARALLELISM));
sideOutput = sideOutput
.filter(new FileChunkFilterFunction(configuration.getString(Configs.SINK_FILTER_EXPRESSION), "side_out_put_hos"))
.name("Filter: Delayed Chunk")
.setParallelism(configuration.get(Configs.SINK_PARALLELISM));
} else {
windowStream
.addSink(new HosSink(configuration))
.name("Hos")
.setParallelism(configuration.get(Configs.SINK_PARALLELISM));
}
sideOutput.map(new SideOutputMapFunction())
.setParallelism(configuration.get(Configs.SINK_PARALLELISM))
.addSink(new HosSink(configuration))
.setParallelism(configuration.get(Configs.SINK_PARALLELISM))
.name("Delayed Chunk");
break;
case "hbase":
sideOutput = windowStream.getSideOutput(delayedChunkOutputTag);
if (StrUtil.isNotEmpty(configuration.getString(Configs.SINK_FILTER_EXPRESSION))) {
windowStream
.filter(new FileChunkFilterFunction(configuration.getString(Configs.SINK_FILTER_EXPRESSION), "sink_hbase"))
@@ -137,42 +83,26 @@ public class FileChunkCombiner {
.addSink(new HBaseSink(configuration))
.name("HBase")
.setParallelism(configuration.get(Configs.SINK_PARALLELISM));
sideOutput = sideOutput
.filter(new FileChunkFilterFunction(configuration.getString(Configs.SINK_FILTER_EXPRESSION), "side_out_put_hbase"))
.name("Filter: Delayed Chunk")
.setParallelism(configuration.get(Configs.SINK_PARALLELISM));
} else {
windowStream
.addSink(new HBaseSink(configuration))
.name("HBase")
.setParallelism(configuration.get(Configs.SINK_PARALLELISM));
}
sideOutput
.map(new SideOutputMapFunction())
.setParallelism(configuration.get(Configs.SINK_PARALLELISM))
.addSink(new HBaseSink(configuration))
.setParallelism(configuration.get(Configs.SINK_PARALLELISM))
.name("Delayed Chunk");
break;
case "oss":
SingleOutputStreamOperator<FileChunk> fileMetaSessionSingleOutputStreamOperator = environment.addSource(FileMetaKafkaConsumer.stringConsumer(configuration, configuration.get(Configs.KAFKA_FILE_META_SESSION_TOPIC)))
.setParallelism(configuration.get(Configs.MAP_PARSE_FILE_META_PARALLELISM))
fileMetaSessionSingleOutputStreamOperator = environment.addSource(FileMetaKafkaConsumer.stringConsumer(configuration, configuration.get(Configs.KAFKA_FILE_META_SESSION_TOPIC)))
.name(configuration.get(Configs.KAFKA_FILE_META_SESSION_TOPIC))
.flatMap(new ParseSessionFileMetaFlatMapFunction())
.setParallelism(configuration.get(Configs.MAP_PARSE_FILE_META_PARALLELISM))
.name("Map: Parse Session File Meta")
.filter(new FileChunkFilterFunction(configuration.getString(Configs.FILE_META_FILTER_EXPRESSION), "map_parse_session_file_meta"))
.name("Filter: Map")
.setParallelism(configuration.get(Configs.MAP_PARSE_FILE_META_PARALLELISM));
SingleOutputStreamOperator<FileChunk> fileMetaProxySingleOutputStreamOperator = environment.addSource(FileMetaKafkaConsumer.stringConsumer(configuration, configuration.get(Configs.KAFKA_FILE_META_PROXY_TOPIC)))
.setParallelism(configuration.get(Configs.MAP_PARSE_FILE_META_PARALLELISM))
.name("Filter: Map");
fileMetaProxySingleOutputStreamOperator = environment.addSource(FileMetaKafkaConsumer.stringConsumer(configuration, configuration.get(Configs.KAFKA_FILE_META_PROXY_TOPIC)))
.name(configuration.get(Configs.KAFKA_FILE_META_PROXY_TOPIC))
.flatMap(new ParseProxyFileMetaFlatMapFunction())
.setParallelism(configuration.get(Configs.MAP_PARSE_FILE_META_PARALLELISM))
.name("Map: Parse Proxy File Meta")
.filter(new FileChunkFilterFunction(configuration.getString(Configs.FILE_META_FILTER_EXPRESSION), "map_parse_proxy_file_meta"))
.name("Filter: Map")
.setParallelism(configuration.get(Configs.MAP_PARSE_FILE_META_PARALLELISM));
.name("Filter: Map");
windowStream
.filter(new FileChunkFilterFunction(configuration.getString(Configs.SINK_OSS_FILTER_EXPRESSION), "sink_oss"))
.name("Filter: Oss")

View File

@@ -43,25 +43,16 @@ public class Configs {
.stringType()
.noDefaultValue();
public static final ConfigOption<Integer> MAP_PARSE_FILE_META_PARALLELISM = ConfigOptions.key("map.parse.file.meta.parallelism")
.intType()
.defaultValue(1);
public static final ConfigOption<String> MAP_FILTER_EXPRESSION = ConfigOptions.key("map.filter.expression")
.stringType()
.defaultValue("");
public static final ConfigOption<Integer> COMBINER_WINDOW_TYPE = ConfigOptions.key("combiner.window.type")
.intType()
.defaultValue(0);
public static final ConfigOption<Integer> COMBINER_WINDOW_PARALLELISM = ConfigOptions.key("combiner.window.parallelism")
.intType()
.defaultValue(1);
public static final ConfigOption<Long> COMBINER_WINDOW_TIME = ConfigOptions.key("combiner.window.time")
public static final ConfigOption<Long> COMBINER_WINDOW_SIZE = ConfigOptions.key("combiner.window.size")
.longType()
.defaultValue(5L);
public static final ConfigOption<Long> COMBINER_WINDOW_ALLOWED_LATENESS = ConfigOptions.key("combiner.window.allowed.lateness")
.longType()
.defaultValue(0L);
public static final ConfigOption<Long> COMBINER_WINDOW_IDLE_TIME = ConfigOptions.key("combiner.window.idle.time")
.longType()
.defaultValue(5L);
.defaultValue(10L);
public static final ConfigOption<Boolean> COMBINER_WINDOW_ENABLE_LAST_CHUNK_TRIGGER = ConfigOptions.key("combiner.window.enable.last.chunk.trigger")
.booleanType()
.defaultValue(true);
@@ -75,18 +66,6 @@ public class Configs {
public static final ConfigOption<Boolean> SINK_ASYNC = ConfigOptions.key("sink.async")
.booleanType()
.defaultValue(false);
public static final ConfigOption<Boolean> SINK_BATCH = ConfigOptions.key("sink.batch")
.booleanType()
.defaultValue(false);
public static final ConfigOption<Integer> SINK_BATCH_COUNT = ConfigOptions.key("sink.batch.count")
.intType()
.defaultValue(1);
public static final ConfigOption<Long> SINK_BATCH_SIZE = ConfigOptions.key("sink.batch.size")
.longType()
.defaultValue(Long.MAX_VALUE);
public static final ConfigOption<Integer> SINK_BATCH_TIME = ConfigOptions.key("sink.batch.time")
.intType()
.defaultValue(5);
public static final ConfigOption<String> SINK_FILTER_EXPRESSION = ConfigOptions.key("sink.filter.expression")
.stringType()
.defaultValue("");
@@ -106,43 +85,39 @@ public class Configs {
public static final ConfigOption<String> SINK_HOS_TOKEN = ConfigOptions.key("sink.hos.token")
.stringType()
.defaultValue("");
public static final ConfigOption<Long> SINK_HOS_BATCH_SIZE = ConfigOptions.key("sink.hos.batch.size")
.longType()
.defaultValue(0L);
public static final ConfigOption<Integer> SINK_HOS_BATCH_INTERVAL_MS = ConfigOptions.key("sink.hos.batch.interval.ms")
.intType()
.defaultValue(0);
public static final ConfigOption<Integer> SINK_HTTP_MAX_TOTAL = ConfigOptions.key("sink.http.max.total")
public static final ConfigOption<Integer> SINK_HTTP_CLIENT_MAX_TOTAL = ConfigOptions.key("sink.http.client.max.total")
.intType()
.defaultValue(2000);
public static final ConfigOption<Integer> SINK_HTTP_MAX_PER_ROUTE = ConfigOptions.key("sink.http.max.per.route")
.defaultValue(100);
public static final ConfigOption<Integer> SINK_HTTP_CLIENT_MAX_PER_ROUTE = ConfigOptions.key("sink.http.client.max.per.route")
.intType()
.defaultValue(1000);
public static final ConfigOption<Integer> SINK_HTTP_ERROR_RETRY = ConfigOptions.key("sink.http.error.retry")
.defaultValue(100);
public static final ConfigOption<Integer> SINK_HTTP_CLIENT_RETRIES_NUMBER = ConfigOptions.key("sink.http.client.retries.number")
.intType()
.defaultValue(3);
public static final ConfigOption<Integer> SINK_HTTP_CONNECT_TIMEOUT = ConfigOptions.key("sink.http.connect.timeout")
public static final ConfigOption<Integer> SINK_HTTP_CLIENT_CONNECT_TIMEOUT_MS = ConfigOptions.key("sink.http.client.connect.timeout.ms")
.intType()
.defaultValue(10000);
public static final ConfigOption<Integer> SINK_HTTP_REQUEST_TIMEOUT = ConfigOptions.key("sink.http.request.timeout")
public static final ConfigOption<Integer> SINK_HTTP_CLIENT_REQUEST_TIMEOUT_MS = ConfigOptions.key("sink.http.client.request.timeout.ms")
.intType()
.defaultValue(10000);
public static final ConfigOption<Integer> SINK_HTTP_SOCKET_TIMEOUT = ConfigOptions.key("sink.http.socket.timeout")
public static final ConfigOption<Integer> SINK_HTTP_CLIENT_SOCKET_TIMEOUT_MS = ConfigOptions.key("sink.http.client.socket.timeout.ms")
.intType()
.defaultValue(60000);
public static final ConfigOption<Boolean> SINK_OSS_ASYNC = ConfigOptions.key("sink.oss.async")
.booleanType()
.defaultValue(false);
public static final ConfigOption<String> SINK_OSS_ENDPOINT = ConfigOptions.key("sink.oss.endpoint")
.stringType()
.noDefaultValue();
public static final ConfigOption<String> SINK_OSS_FILTER_EXPRESSION = ConfigOptions.key("sink.oss.filter.expression")
.stringType()
.defaultValue("");
public static final ConfigOption<String> SINK_HBASE_ZOOKEEPER = ConfigOptions.key("sink.hbase.zookeeper")
.stringType()
.defaultValue("");
.defaultValue("127.0.0.1");
public static final ConfigOption<Integer> SINK_HBASE_RETRIES_NUMBER = ConfigOptions.key("sink.hbase.retries.number")
.intType()
.defaultValue(10);
public static final ConfigOption<Integer> SINK_HBASE_RPC_TIMEOUT = ConfigOptions.key("sink.hbase.rpc.timeout")
public static final ConfigOption<Integer> SINK_HBASE_RPC_TIMEOUT_MS = ConfigOptions.key("sink.hbase.rpc.timeout.ms")
.intType()
.defaultValue(600000);
public static final ConfigOption<Integer> SINK_HBASE_CLIENT_WRITE_BUFFER = ConfigOptions.key("sink.hbase.client.write.buffer")
@@ -150,18 +125,29 @@ public class Configs {
.defaultValue(10485760);
public static final ConfigOption<Integer> SINK_HBASE_CLIENT_IPC_POOL_SIZE = ConfigOptions.key("sink.hbase.client.ipc.pool.size")
.intType()
.defaultValue(1);
public static final ConfigOption<Integer> FILE_MAX_CHUNK_COUNT = ConfigOptions.key("file.max.chunk.count")
.defaultValue(3);
public static final ConfigOption<Long> SINK_HBASE_BATCH_SIZE = ConfigOptions.key("sink.hbase.batch.size")
.longType()
.defaultValue(0L);
public static final ConfigOption<Integer> SINK_HBASE_BATCH_INTERVAL_MS = ConfigOptions.key("sink.hbase.batch.interval.ms")
.intType()
.defaultValue(100000);
public static final ConfigOption<String> MAP_FILTER_EXPRESSION = ConfigOptions.key("map.filter.expression")
.defaultValue(0);
public static final ConfigOption<String> SINK_OSS_ENDPOINT = ConfigOptions.key("sink.oss.endpoint")
.stringType()
.noDefaultValue();
public static final ConfigOption<String> SINK_OSS_FILTER_EXPRESSION = ConfigOptions.key("sink.oss.filter.expression")
.stringType()
.defaultValue("");
public static final ConfigOption<Long> SINK_OSS_CACHE_TIME_MS = ConfigOptions.key("sink.oss.cache.time.ms")
.longType()
.defaultValue(0L);
public static final ConfigOption<Long> SINK_OSS_CACHE_SIZE = ConfigOptions.key("sink.oss.cache.size")
.longType()
.defaultValue(0L);
public static final ConfigOption<String> FILE_META_FILTER_EXPRESSION = ConfigOptions.key("file.meta.filter.expression")
.stringType()
.defaultValue("");
public static final ConfigOption<String> KAFKA_FILE_META_SESSION_TOPIC = ConfigOptions.key("source.kafka.file.meta.session.topic")
.stringType()
.noDefaultValue();
@@ -171,10 +157,4 @@ public class Configs {
public static final ConfigOption<String> KAFKA_FILE_META_GROUP_ID = ConfigOptions.key("source.kafka.file.meta.group.id")
.stringType()
.defaultValue("file_chunk_combine_1");
public static final ConfigOption<Long> FILE_META_CACHE_TIME = ConfigOptions.key("file.meta.cache.time")
.longType()
.defaultValue(0L);
public static final ConfigOption<Long> FILE_META_CACHE_SIZE = ConfigOptions.key("file.meta.cache.size")
.longType()
.defaultValue(0L);
}

View File

@@ -36,11 +36,6 @@ public class CombineChunkProcessWindowFunction extends ProcessWindowFunction<Fil
public transient Counter nullChunksCounter;
public transient Counter nullTxtChunksCounter;
public transient Counter nullEmlChunksCounter;
private final int fileMaxChunkCount;
public CombineChunkProcessWindowFunction(int fileMaxChunkCount) {
this.fileMaxChunkCount = fileMaxChunkCount;
}
@Override
public void open(Configuration parameters) throws Exception {
@@ -166,9 +161,6 @@ public class CombineChunkProcessWindowFunction extends ProcessWindowFunction<Fil
waitingToCombineChunkList.add(chunk);
timestampAndSizes.append(originalFileChunk.getTimestamp()).append("-").append(chunk.length).append(";");
}
if (waitingToCombineChunkList.size() > fileMaxChunkCount) {
break;
}
}
if (waitingToCombineChunkList.size() > 0) {
FileChunk fileChunk = combineChunk(waitingToCombineChunkList, originalFileChunkList.get(0).getUuid(), originalFileChunkList.get(0).getFileName(), originalFileChunkList.get(0).getFileType(), 0, "append", 0, originalFileChunkList.get(0).getMeta(), startTimestamp, timestampAndSizes.toString());

View File

@@ -188,9 +188,8 @@ public class ParseMessagePackMapFunction extends RichMapFunction<byte[], FileChu
chunksInCounter.inc();
bytesInCounter.inc(messagePackData.length);
FileChunk fileChunk;
try {
try (MessageUnpacker messageUnpacker = MessagePack.newDefaultUnpacker(messagePackData)) {
fileChunk = new FileChunk();
MessageUnpacker messageUnpacker = MessagePack.newDefaultUnpacker(messagePackData);
int numFields = messageUnpacker.unpackMapHeader();
Map<String, Object> metaMap = new HashMap<>();
for (int i = 0; i < numFields; i++) {

View File

@@ -1,33 +0,0 @@
package com.zdjizhi.function.map;
import com.zdjizhi.pojo.FileChunk;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.MetricGroup;
import static com.zdjizhi.utils.PublicConstants.COMBINE_MODE_APPEND;
public class SideOutputMapFunction extends RichMapFunction<FileChunk, FileChunk> {
public transient Counter delayedChunksCounter;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
MetricGroup metricGroup = getRuntimeContext().getMetricGroup().addGroup("file_chunk_combiner", "side_out_put");
delayedChunksCounter = metricGroup.counter("delayedChunksCount");
metricGroup.meter("numChunksDelayPerSecond", new MeterView(delayedChunksCounter));
}
@Override
public FileChunk map(FileChunk fileChunk) {
delayedChunksCounter.inc();
fileChunk.setChunkCount(1);
if (COMBINE_MODE_APPEND.equals(fileChunk.getCombineMode())) {
fileChunk.setChunkNumbers(fileChunk.getTimestamp() + "-" + fileChunk.getChunk().length + ";");
}
return fileChunk;
}
}

View File

@@ -75,8 +75,8 @@ public class HBaseSink extends RichSinkFunction<FileChunk> {
private List<Put> indexTimePutList;
private List<Put> indexFilenamePutList;
private long chunkSize;
private long maxBatchSize;
private long maxBatchCount;
private long batchSize;
private long batchInterval;
private ScheduledExecutorService executorService;
private long rateLimitThreshold;
private String rateLimitExpression;
@@ -162,24 +162,21 @@ public class HBaseSink extends RichSinkFunction<FileChunk> {
indexFilenameTable = syncHBaseConnection.getTable(TableName.valueOf("default:index_filename_" + configuration.get(Configs.SINK_HOS_BUCKET)));
}
timestamp = System.currentTimeMillis();
if (configuration.get(Configs.SINK_BATCH)) {
maxBatchSize = configuration.getLong(Configs.SINK_BATCH_SIZE);
maxBatchCount = configuration.getInteger(Configs.SINK_BATCH_COUNT);
dataPutList = new ArrayList<>();
indexTimePutList = new ArrayList<>();
indexFilenamePutList = new ArrayList<>();
batchSize = configuration.getLong(Configs.SINK_HBASE_BATCH_SIZE);
batchInterval = configuration.getInteger(Configs.SINK_HBASE_BATCH_INTERVAL_MS);
dataPutList = new ArrayList<>();
indexTimePutList = new ArrayList<>();
indexFilenamePutList = new ArrayList<>();
if (batchSize > 0 && batchInterval > 0) {
chunkSize = 0;
executorService = Executors.newScheduledThreadPool(1);
long period = configuration.getInteger(Configs.SINK_BATCH_TIME);
executorService.scheduleWithFixedDelay(() -> {
if (System.currentTimeMillis() - timestamp > (period * 1000)) {
synchronized (this) {
if (!dataPutList.isEmpty()) {
synchronized (this) {
sendBatchData();
}
sendDataToHbase();
}
}
}, period, period, TimeUnit.SECONDS);
}, batchInterval, batchInterval, TimeUnit.MILLISECONDS);
}
if (rateLimitThreshold > 0) {
rateLimitThreshold = configuration.getLong(Configs.SINK_RATE_LIMIT_THRESHOLD);
@@ -217,7 +214,6 @@ public class HBaseSink extends RichSinkFunction<FileChunk> {
sendFileChunk(fileChunk);
}
} else {
timestamp = currentTimeMillis;
sendFileChunk(fileChunk);
}
}
@@ -236,7 +232,7 @@ public class HBaseSink extends RichSinkFunction<FileChunk> {
}
private void sendFileChunk(FileChunk fileChunk) {
if (COMBINE_MODE_SEEK.equals(fileChunk.getCombineMode()) && configuration.get(Configs.SINK_BATCH)) {
if (COMBINE_MODE_SEEK.equals(fileChunk.getCombineMode())) {
byte[] data = "".getBytes();
if (fileChunk.getChunk() != null) {
data = fileChunk.getChunk();
@@ -282,13 +278,17 @@ public class HBaseSink extends RichSinkFunction<FileChunk> {
chunksOutCounter.inc();
bytesOutCounter.inc(chunkLength);
calculateFileChunkMetrics(fileChunk);
if (chunkSize >= maxBatchSize || dataPutList.size() >= maxBatchCount) {
sendBatchData();
if (batchSize > 0 && batchInterval > 0) {
if (chunkSize >= batchSize) {
sendDataToHbase();
}
} else {
sendDataToHbase();
}
}
}
private void sendBatchData() {
private void sendDataToHbase() {
if (isAsync) {
List<CompletableFuture<Object>> futures = asyncTable.batch(dataPutList);
CompletableFuture.supplyAsync(() -> {
@@ -303,18 +303,27 @@ public class HBaseSink extends RichSinkFunction<FileChunk> {
return null;
});
dataPutList.clear();
asyncIndexTimeTable.batch(indexTimePutList);
indexTimePutList.clear();
asyncIndexFilenameTable.batch(indexFilenamePutList);
indexFilenamePutList.clear();
if (indexTimePutList.size() > 0) {
asyncIndexTimeTable.batch(indexTimePutList);
indexTimePutList.clear();
}
if (indexFilenamePutList.size() > 0) {
asyncIndexFilenameTable.batch(indexFilenamePutList);
indexFilenamePutList.clear();
}
} else {
try {
table.batch(dataPutList, null);
indexTimeTable.batch(indexTimePutList, null);
indexFilenameTable.batch(indexFilenamePutList, null);
if (indexTimePutList.size() > 0) {
indexTimeTable.batch(indexTimePutList, null);
}
if (indexFilenamePutList.size() > 0) {
indexFilenameTable.batch(indexFilenamePutList, null);
}
} catch (IOException | InterruptedException e) {
LOG.error("Put chunk to hbase error. ", e.getMessage());
errorChunksCounter.inc(dataPutList.size());
Thread.currentThread().interrupt();
} finally {
dataPutList.clear();
indexTimePutList.clear();

View File

@@ -79,8 +79,8 @@ public class HosSink extends RichSinkFunction<FileChunk> {
private String objectsMeta;
private String objectsOffset;
private List<byte[]> byteList;
private long maxBatchSize;
private long maxBatchCount;
private long batchSize;
private long batchInterval;
private long chunkSize;
private ScheduledExecutorService executorService;
private long rateLimitThreshold;
@@ -171,26 +171,23 @@ public class HosSink extends RichSinkFunction<FileChunk> {
syncHttpClient = HttpClientUtil.getInstance(configuration).getSyncHttpClient();
}
timestamp = System.currentTimeMillis();
if (configuration.get(Configs.SINK_BATCH)) {
batchSize = configuration.getLong(Configs.SINK_HOS_BATCH_SIZE);
batchInterval = configuration.getInteger(Configs.SINK_HOS_BATCH_INTERVAL_MS);
if (batchSize > 0 && batchInterval > 0) {
bathPutUrl = URLUtil.normalize(endpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + PublicUtil.getUUID()) + "?multiFile";
maxBatchSize = configuration.getLong(Configs.SINK_BATCH_SIZE);
maxBatchCount = configuration.getInteger(Configs.SINK_BATCH_COUNT);
hosMessage = new HashMap<>();
byteList = new ArrayList<>();
objectsMeta = "";
objectsOffset = "";
chunkSize = 0;
executorService = Executors.newScheduledThreadPool(1);
long period = configuration.getInteger(Configs.SINK_BATCH_TIME);
executorService.scheduleWithFixedDelay(() -> {
if (System.currentTimeMillis() - timestamp > (period * 1000)) {
synchronized (this) {
if (!byteList.isEmpty()) {
synchronized (this) {
sendBatchData();
}
sendBatchData();
}
}
}, period, period, TimeUnit.SECONDS);
}, batchInterval, batchInterval, TimeUnit.MILLISECONDS);
}
if (rateLimitThreshold > 0) {
rateLimitThreshold = configuration.getLong(Configs.SINK_RATE_LIMIT_THRESHOLD);
@@ -228,7 +225,6 @@ public class HosSink extends RichSinkFunction<FileChunk> {
sendFileChunk(fileChunk);
}
} else {
timestamp = currentTimeMillis;
sendFileChunk(fileChunk);
}
}
@@ -250,7 +246,7 @@ public class HosSink extends RichSinkFunction<FileChunk> {
data = fileChunk.getChunk();
}
long chunkLength = data.length;
if (configuration.get(Configs.SINK_BATCH)) {
if (batchSize > 0 && batchInterval > 0) {
hosMessage.put(HOS_META_FILE_TYPE, fileChunk.getFileType());
hosMessage.put(HOS_META_FILENAME, fileChunk.getUuid());
if (COMBINE_MODE_SEEK.equals(fileChunk.getCombineMode())) {
@@ -275,7 +271,7 @@ public class HosSink extends RichSinkFunction<FileChunk> {
chunksOutCounter.inc();
bytesOutCounter.inc(chunkLength);
calculateFileChunkMetrics(fileChunk);
if (chunkSize >= maxBatchSize || byteList.size() >= maxBatchCount) {
if (chunkSize >= batchSize) {
sendBatchData();
}
} else {

View File

@@ -84,14 +84,14 @@ public class OssSinkByCaffeineCache extends RichSinkFunction<FileChunk> {
super.open(parameters);
MetricGroup metricGroup = getRuntimeContext().getMetricGroup().addGroup("file_chunk_combiner", "sink_oss");
endpointList = Arrays.asList(configuration.get(Configs.SINK_OSS_ENDPOINT).split(","));
isAsync = configuration.getBoolean(Configs.SINK_OSS_ASYNC);
isAsync = configuration.getBoolean(Configs.SINK_ASYNC);
if (isAsync) {
asyncHttpClient = HttpClientUtil.getInstance(configuration).getAsyncHttpClient();
asyncHttpClient.start();
} else {
syncHttpClient = HttpClientUtil.getInstance(configuration).getSyncHttpClient();
}
cache = CaffeineCacheUtil.getInstance(configuration).getCaffeineCache();
cache = CaffeineCacheUtil.getInstance(configuration.getLong(Configs.SINK_OSS_CACHE_SIZE), configuration.getLong(Configs.SINK_OSS_CACHE_TIME_MS)).getCaffeineCache();
metricGroup.gauge("cacheLength", (Gauge<Long>) () -> cache.estimatedSize());
lessThan1KBChunksCounter = metricGroup.counter("lessThan1KBChunksCount");
between1KBAnd5KBChunksCounter = metricGroup.counter("between1KBAnd5KBChunksCount");

View File

@@ -3,9 +3,7 @@ package com.zdjizhi.utils;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Weigher;
import com.zdjizhi.config.Configs;
import com.zdjizhi.pojo.FileChunk;
import org.apache.flink.configuration.Configuration;
import org.checkerframework.checker.index.qual.NonNegative;
import org.checkerframework.checker.nullness.qual.NonNull;
@@ -15,23 +13,23 @@ public class CaffeineCacheUtil {
private static CaffeineCacheUtil caffeineCacheUtil = null;
private static Cache<String, FileChunk> caffeineCache = null;
private CaffeineCacheUtil(Configuration configuration) {
private CaffeineCacheUtil(Long cacheSize, Long cacheTimeMs) {
caffeineCache = Caffeine.newBuilder()
// .initialCapacity(configuration.getLong(Configs.FILE_META_CACHE_SIZE))
.maximumWeight(configuration.getLong(Configs.FILE_META_CACHE_SIZE))
// .initialCapacity(configuration.getLong(Configs.SINK_OSS_CACHE_SIZE))
.maximumWeight(cacheSize)
.weigher(new Weigher<String, FileChunk>() {
@Override
public @NonNegative int weigh(@NonNull String key, @NonNull FileChunk value) {
return (int) value.getLength();
}
})
.expireAfterWrite(configuration.get(Configs.FILE_META_CACHE_TIME), TimeUnit.SECONDS)
.expireAfterWrite(cacheTimeMs, TimeUnit.MILLISECONDS)
.build();
}
public static synchronized CaffeineCacheUtil getInstance(Configuration configuration) {
public static synchronized CaffeineCacheUtil getInstance(Long cacheSize, Long cacheTimeMs) {
if (null == caffeineCacheUtil) {
caffeineCacheUtil = new CaffeineCacheUtil(configuration);
caffeineCacheUtil = new CaffeineCacheUtil(cacheSize, cacheTimeMs);
}
return caffeineCacheUtil;
}

View File

@@ -22,8 +22,8 @@ public class HBaseConnectionUtil {
hbaseConfiguration.set(HConstants.ZOOKEEPER_QUORUM, configuration.getString(Configs.SINK_HBASE_ZOOKEEPER));
hbaseConfiguration.set(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT, "2181");
hbaseConfiguration.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase");
hbaseConfiguration.set(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, configuration.get(Configs.SINK_HBASE_RETRIES_NUMBER) + "");
hbaseConfiguration.set(HConstants.HBASE_RPC_TIMEOUT_KEY, configuration.get(Configs.SINK_HBASE_RPC_TIMEOUT) + "");
hbaseConfiguration.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER, configuration.get(Configs.SINK_HBASE_RETRIES_NUMBER) + "");
hbaseConfiguration.set(HConstants.HBASE_RPC_TIMEOUT_KEY, configuration.get(Configs.SINK_HBASE_RPC_TIMEOUT_MS) + "");
hbaseConfiguration.set(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, "1073741800");
hbaseConfiguration.set(ConnectionConfiguration.WRITE_BUFFER_SIZE_KEY, configuration.get(Configs.SINK_HBASE_CLIENT_WRITE_BUFFER) + "");
hbaseConfiguration.set(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, configuration.get(Configs.SINK_HBASE_CLIENT_IPC_POOL_SIZE) + "");
@@ -51,6 +51,7 @@ public class HBaseConnectionUtil {
try {
asyncHBaseConnection = ConnectionFactory.createAsyncConnection(hbaseConfiguration).get();
} catch (ExecutionException | InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e.getMessage());
}
return asyncHBaseConnection;

View File

@@ -44,15 +44,15 @@ public class HttpClientUtil {
private RequestConfig getRequestConfig() {
return RequestConfig.custom()
.setConnectTimeout(configuration.get(Configs.SINK_HTTP_CONNECT_TIMEOUT))
.setConnectionRequestTimeout(configuration.get(Configs.SINK_HTTP_REQUEST_TIMEOUT))
.setSocketTimeout(configuration.get(Configs.SINK_HTTP_SOCKET_TIMEOUT))
.setConnectTimeout(configuration.get(Configs.SINK_HTTP_CLIENT_CONNECT_TIMEOUT_MS))
.setConnectionRequestTimeout(configuration.get(Configs.SINK_HTTP_CLIENT_REQUEST_TIMEOUT_MS))
.setSocketTimeout(configuration.get(Configs.SINK_HTTP_CLIENT_SOCKET_TIMEOUT_MS))
.build();
}
private HttpRequestRetryHandler getRetryHandler() {
return (exception, executionCount, context) -> {
if (executionCount >= configuration.get(Configs.SINK_HTTP_ERROR_RETRY)) {
if (executionCount >= configuration.get(Configs.SINK_HTTP_CLIENT_RETRIES_NUMBER)) {
return false;
}
if (exception instanceof NoHttpResponseException) {// 如果服务器丢掉了连接,那么就重试
@@ -106,9 +106,9 @@ public class HttpClientUtil {
// 创建ConnectionManager添加Connection配置信息
connManager = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
// 设置最大连接数
connManager.setMaxTotal(configuration.get(Configs.SINK_HTTP_MAX_TOTAL));
connManager.setMaxTotal(configuration.get(Configs.SINK_HTTP_CLIENT_MAX_TOTAL));
// 设置每个连接的路由数
connManager.setDefaultMaxPerRoute(configuration.get(Configs.SINK_HTTP_MAX_PER_ROUTE));
connManager.setDefaultMaxPerRoute(configuration.get(Configs.SINK_HTTP_CLIENT_MAX_PER_ROUTE));
} catch (KeyManagementException | NoSuchAlgorithmException e) {
throw new RuntimeException(e.getMessage());
}
@@ -146,8 +146,8 @@ public class HttpClientUtil {
.build();
ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
connManager = new PoolingNHttpClientConnectionManager(ioReactor);
connManager.setMaxTotal(configuration.get(Configs.SINK_HTTP_MAX_TOTAL));
connManager.setDefaultMaxPerRoute(configuration.get(Configs.SINK_HTTP_MAX_PER_ROUTE));
connManager.setMaxTotal(configuration.get(Configs.SINK_HTTP_CLIENT_MAX_TOTAL));
connManager.setDefaultMaxPerRoute(configuration.get(Configs.SINK_HTTP_CLIENT_MAX_PER_ROUTE));
} catch (IOReactorException e) {
throw new RuntimeException(e.getMessage());
}