From 942acad964e954601cc8b81ea9eda1ad8f97388d Mon Sep 17 00:00:00 2001 From: houjinchuan Date: Mon, 8 Jul 2024 10:07:07 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 2 +- .../java/com/zdjizhi/FileChunkCombiner.java | 122 ++++-------------- src/main/java/com/zdjizhi/config/Configs.java | 96 ++++++-------- .../CombineChunkProcessWindowFunction.java | 8 -- .../map/ParseMessagePackMapFunction.java | 3 +- .../function/map/SideOutputMapFunction.java | 33 ----- src/main/java/com/zdjizhi/sink/HBaseSink.java | 59 +++++---- src/main/java/com/zdjizhi/sink/HosSink.java | 24 ++-- .../zdjizhi/sink/OssSinkByCaffeineCache.java | 4 +- .../com/zdjizhi/utils/CaffeineCacheUtil.java | 14 +- .../zdjizhi/utils/HBaseConnectionUtil.java | 5 +- .../com/zdjizhi/utils/HttpClientUtil.java | 16 +-- src/main/resources/common.properties | 110 ++++++++-------- .../com/zdjizhi/FileChunkCombinerTests.java | 122 ++++++++---------- 14 files changed, 240 insertions(+), 378 deletions(-) delete mode 100644 src/main/java/com/zdjizhi/function/map/SideOutputMapFunction.java diff --git a/pom.xml b/pom.xml index f789285..bc48636 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.zdjizhi file-chunk-combiner - 1.3.1 + 1.3.2 diff --git a/src/main/java/com/zdjizhi/FileChunkCombiner.java b/src/main/java/com/zdjizhi/FileChunkCombiner.java index f60fefe..6955039 100644 --- a/src/main/java/com/zdjizhi/FileChunkCombiner.java +++ b/src/main/java/com/zdjizhi/FileChunkCombiner.java @@ -6,31 +6,22 @@ import com.zdjizhi.function.*; import com.zdjizhi.function.map.ParseMessagePackMapFunction; import com.zdjizhi.function.map.ParseProxyFileMetaFlatMapFunction; import com.zdjizhi.function.map.ParseSessionFileMetaFlatMapFunction; -import com.zdjizhi.function.map.SideOutputMapFunction; import com.zdjizhi.kafka.FileMetaKafkaConsumer; import com.zdjizhi.pojo.*; import com.zdjizhi.sink.*; import com.zdjizhi.kafka.KafkaConsumer; -import com.zdjizhi.trigger.IdleTimeTrigger; -import com.zdjizhi.trigger.LastChunkOrNoDataInTimeTrigger; import com.zdjizhi.trigger.LastChunkTrigger; import com.zdjizhi.trigger.MultipleTrigger; -import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.*; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; -import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; -import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; -import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.*; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.util.OutputTag; -import java.time.Duration; import java.util.*; public class FileChunkCombiner { @@ -41,68 +32,33 @@ public class FileChunkCombiner { final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); environment.getConfig().setGlobalJobParameters(configuration); - SingleOutputStreamOperator windowStream; - OutputTag delayedChunkOutputTag = new OutputTag("delayed-chunk") { - }; - if (configuration.getInteger(Configs.COMBINER_WINDOW_TYPE) == 0) { - WatermarkStrategy watermarkStrategy = WatermarkStrategy - .forBoundedOutOfOrderness(Duration.ofSeconds(configuration.get(Configs.COMBINER_WINDOW_ALLOWED_LATENESS))) - .withTimestampAssigner((FileChunk, timestamp) -> FileChunk.getTimestamp() / 1000); + SingleOutputStreamOperator parseMessagePackStream = environment + .addSource(KafkaConsumer.byteArrayConsumer(configuration)) + .name(configuration.get(Configs.KAFKA_TOPIC)) + .map(new ParseMessagePackMapFunction()) + .name("Map: Parse Message Pack") + .filter(new FileChunkFilterFunction(configuration.getString(Configs.MAP_FILTER_EXPRESSION), "map_parse_message_pack")) + .name("Filter: Map"); - SingleOutputStreamOperator parseMessagePackStream = environment - .addSource(KafkaConsumer.byteArrayConsumer(configuration)) - .name(configuration.get(Configs.KAFKA_TOPIC)) - .map(new ParseMessagePackMapFunction()) - .name("Map: Parse Message Pack") - .filter(new FileChunkFilterFunction(configuration.getString(Configs.MAP_FILTER_EXPRESSION), "map_parse_message_pack")) - .name("Filter: Map") - .assignTimestampsAndWatermarks(watermarkStrategy); - - List> triggers = new ArrayList<>(); - triggers.add(EventTimeTrigger.create()); - if (configuration.get(Configs.COMBINER_WINDOW_ENABLE_LAST_CHUNK_TRIGGER)) { - triggers.add(LastChunkOrNoDataInTimeTrigger.of(configuration.get(Configs.COMBINER_WINDOW_IDLE_TIME) * 1000)); - } else { - triggers.add(IdleTimeTrigger.of(configuration.get(Configs.COMBINER_WINDOW_IDLE_TIME) * 1000)); - } - Trigger trigger = MultipleTrigger.of(triggers); - windowStream = parseMessagePackStream - .keyBy(new FileChunkKeySelector(), BasicTypeInfo.STRING_TYPE_INFO) - .window(TumblingEventTimeWindows.of(Time.seconds(configuration.get(Configs.COMBINER_WINDOW_TIME)))) - .trigger(trigger) - .sideOutputLateData(delayedChunkOutputTag) - .allowedLateness(Time.seconds(configuration.get(Configs.COMBINER_WINDOW_ALLOWED_LATENESS))) - .process(new CombineChunkProcessWindowFunction(configuration.get(Configs.FILE_MAX_CHUNK_COUNT))) - .name("Window: Combine Chunk") - .setParallelism(configuration.get(Configs.COMBINER_WINDOW_PARALLELISM)); - } else { - SingleOutputStreamOperator parseMessagePackStream = environment - .addSource(KafkaConsumer.byteArrayConsumer(configuration)) - .name(configuration.get(Configs.KAFKA_TOPIC)) - .map(new ParseMessagePackMapFunction()) - .name("Map: Parse Message Pack") - .filter(new FileChunkFilterFunction(configuration.getString(Configs.MAP_FILTER_EXPRESSION), "map_parse_message_pack")) - .name("Filter: Map"); - - List> triggers = new ArrayList<>(); - triggers.add(ProcessingTimeTrigger.create()); - if (configuration.get(Configs.COMBINER_WINDOW_ENABLE_LAST_CHUNK_TRIGGER)) { - triggers.add(LastChunkTrigger.create()); - } - Trigger trigger = MultipleTrigger.of(triggers); - windowStream = parseMessagePackStream - .keyBy(new FileChunkKeySelector(), BasicTypeInfo.STRING_TYPE_INFO) - .window(TumblingProcessingTimeWindows.of(Time.seconds(configuration.get(Configs.COMBINER_WINDOW_TIME)))) - .trigger(trigger) - .process(new CombineChunkProcessWindowFunction(configuration.get(Configs.FILE_MAX_CHUNK_COUNT))) - .name("Window: Combine Chunk") - .setParallelism(configuration.get(Configs.COMBINER_WINDOW_PARALLELISM)); + List> triggers = new ArrayList<>(); + triggers.add(ProcessingTimeTrigger.create()); + if (configuration.get(Configs.COMBINER_WINDOW_ENABLE_LAST_CHUNK_TRIGGER)) { + triggers.add(LastChunkTrigger.create()); } + Trigger trigger = MultipleTrigger.of(triggers); + SingleOutputStreamOperator windowStream = parseMessagePackStream + .keyBy(new FileChunkKeySelector(), BasicTypeInfo.STRING_TYPE_INFO) + .window(TumblingProcessingTimeWindows.of(Time.seconds(configuration.get(Configs.COMBINER_WINDOW_SIZE)))) + .trigger(trigger) + .process(new CombineChunkProcessWindowFunction()) + .name("Window: Combine Chunk") + .setParallelism(configuration.get(Configs.COMBINER_WINDOW_PARALLELISM)); + SingleOutputStreamOperator fileMetaSessionSingleOutputStreamOperator; + SingleOutputStreamOperator fileMetaProxySingleOutputStreamOperator; for (String sinkType : configuration.get(Configs.SINK_TYPE).split(",")) { switch (sinkType) { case "hos": - DataStream sideOutput = windowStream.getSideOutput(delayedChunkOutputTag); if (StrUtil.isNotEmpty(configuration.getString(Configs.SINK_FILTER_EXPRESSION))) { windowStream .filter(new FileChunkFilterFunction(configuration.getString(Configs.SINK_FILTER_EXPRESSION), "sink_hos")) @@ -111,24 +67,14 @@ public class FileChunkCombiner { .addSink(new HosSink(configuration)) .name("Hos") .setParallelism(configuration.get(Configs.SINK_PARALLELISM)); - sideOutput = sideOutput - .filter(new FileChunkFilterFunction(configuration.getString(Configs.SINK_FILTER_EXPRESSION), "side_out_put_hos")) - .name("Filter: Delayed Chunk") - .setParallelism(configuration.get(Configs.SINK_PARALLELISM)); } else { windowStream .addSink(new HosSink(configuration)) .name("Hos") .setParallelism(configuration.get(Configs.SINK_PARALLELISM)); } - sideOutput.map(new SideOutputMapFunction()) - .setParallelism(configuration.get(Configs.SINK_PARALLELISM)) - .addSink(new HosSink(configuration)) - .setParallelism(configuration.get(Configs.SINK_PARALLELISM)) - .name("Delayed Chunk"); break; case "hbase": - sideOutput = windowStream.getSideOutput(delayedChunkOutputTag); if (StrUtil.isNotEmpty(configuration.getString(Configs.SINK_FILTER_EXPRESSION))) { windowStream .filter(new FileChunkFilterFunction(configuration.getString(Configs.SINK_FILTER_EXPRESSION), "sink_hbase")) @@ -137,42 +83,26 @@ public class FileChunkCombiner { .addSink(new HBaseSink(configuration)) .name("HBase") .setParallelism(configuration.get(Configs.SINK_PARALLELISM)); - sideOutput = sideOutput - .filter(new FileChunkFilterFunction(configuration.getString(Configs.SINK_FILTER_EXPRESSION), "side_out_put_hbase")) - .name("Filter: Delayed Chunk") - .setParallelism(configuration.get(Configs.SINK_PARALLELISM)); } else { windowStream .addSink(new HBaseSink(configuration)) .name("HBase") .setParallelism(configuration.get(Configs.SINK_PARALLELISM)); } - sideOutput - .map(new SideOutputMapFunction()) - .setParallelism(configuration.get(Configs.SINK_PARALLELISM)) - .addSink(new HBaseSink(configuration)) - .setParallelism(configuration.get(Configs.SINK_PARALLELISM)) - .name("Delayed Chunk"); break; case "oss": - SingleOutputStreamOperator fileMetaSessionSingleOutputStreamOperator = environment.addSource(FileMetaKafkaConsumer.stringConsumer(configuration, configuration.get(Configs.KAFKA_FILE_META_SESSION_TOPIC))) - .setParallelism(configuration.get(Configs.MAP_PARSE_FILE_META_PARALLELISM)) + fileMetaSessionSingleOutputStreamOperator = environment.addSource(FileMetaKafkaConsumer.stringConsumer(configuration, configuration.get(Configs.KAFKA_FILE_META_SESSION_TOPIC))) .name(configuration.get(Configs.KAFKA_FILE_META_SESSION_TOPIC)) .flatMap(new ParseSessionFileMetaFlatMapFunction()) - .setParallelism(configuration.get(Configs.MAP_PARSE_FILE_META_PARALLELISM)) .name("Map: Parse Session File Meta") .filter(new FileChunkFilterFunction(configuration.getString(Configs.FILE_META_FILTER_EXPRESSION), "map_parse_session_file_meta")) - .name("Filter: Map") - .setParallelism(configuration.get(Configs.MAP_PARSE_FILE_META_PARALLELISM)); - SingleOutputStreamOperator fileMetaProxySingleOutputStreamOperator = environment.addSource(FileMetaKafkaConsumer.stringConsumer(configuration, configuration.get(Configs.KAFKA_FILE_META_PROXY_TOPIC))) - .setParallelism(configuration.get(Configs.MAP_PARSE_FILE_META_PARALLELISM)) + .name("Filter: Map"); + fileMetaProxySingleOutputStreamOperator = environment.addSource(FileMetaKafkaConsumer.stringConsumer(configuration, configuration.get(Configs.KAFKA_FILE_META_PROXY_TOPIC))) .name(configuration.get(Configs.KAFKA_FILE_META_PROXY_TOPIC)) .flatMap(new ParseProxyFileMetaFlatMapFunction()) - .setParallelism(configuration.get(Configs.MAP_PARSE_FILE_META_PARALLELISM)) .name("Map: Parse Proxy File Meta") .filter(new FileChunkFilterFunction(configuration.getString(Configs.FILE_META_FILTER_EXPRESSION), "map_parse_proxy_file_meta")) - .name("Filter: Map") - .setParallelism(configuration.get(Configs.MAP_PARSE_FILE_META_PARALLELISM)); + .name("Filter: Map"); windowStream .filter(new FileChunkFilterFunction(configuration.getString(Configs.SINK_OSS_FILTER_EXPRESSION), "sink_oss")) .name("Filter: Oss") diff --git a/src/main/java/com/zdjizhi/config/Configs.java b/src/main/java/com/zdjizhi/config/Configs.java index 383d0c0..25ba9aa 100644 --- a/src/main/java/com/zdjizhi/config/Configs.java +++ b/src/main/java/com/zdjizhi/config/Configs.java @@ -43,25 +43,16 @@ public class Configs { .stringType() .noDefaultValue(); - public static final ConfigOption MAP_PARSE_FILE_META_PARALLELISM = ConfigOptions.key("map.parse.file.meta.parallelism") - .intType() - .defaultValue(1); + public static final ConfigOption MAP_FILTER_EXPRESSION = ConfigOptions.key("map.filter.expression") + .stringType() + .defaultValue(""); - public static final ConfigOption COMBINER_WINDOW_TYPE = ConfigOptions.key("combiner.window.type") - .intType() - .defaultValue(0); public static final ConfigOption COMBINER_WINDOW_PARALLELISM = ConfigOptions.key("combiner.window.parallelism") .intType() .defaultValue(1); - public static final ConfigOption COMBINER_WINDOW_TIME = ConfigOptions.key("combiner.window.time") + public static final ConfigOption COMBINER_WINDOW_SIZE = ConfigOptions.key("combiner.window.size") .longType() - .defaultValue(5L); - public static final ConfigOption COMBINER_WINDOW_ALLOWED_LATENESS = ConfigOptions.key("combiner.window.allowed.lateness") - .longType() - .defaultValue(0L); - public static final ConfigOption COMBINER_WINDOW_IDLE_TIME = ConfigOptions.key("combiner.window.idle.time") - .longType() - .defaultValue(5L); + .defaultValue(10L); public static final ConfigOption COMBINER_WINDOW_ENABLE_LAST_CHUNK_TRIGGER = ConfigOptions.key("combiner.window.enable.last.chunk.trigger") .booleanType() .defaultValue(true); @@ -75,18 +66,6 @@ public class Configs { public static final ConfigOption SINK_ASYNC = ConfigOptions.key("sink.async") .booleanType() .defaultValue(false); - public static final ConfigOption SINK_BATCH = ConfigOptions.key("sink.batch") - .booleanType() - .defaultValue(false); - public static final ConfigOption SINK_BATCH_COUNT = ConfigOptions.key("sink.batch.count") - .intType() - .defaultValue(1); - public static final ConfigOption SINK_BATCH_SIZE = ConfigOptions.key("sink.batch.size") - .longType() - .defaultValue(Long.MAX_VALUE); - public static final ConfigOption SINK_BATCH_TIME = ConfigOptions.key("sink.batch.time") - .intType() - .defaultValue(5); public static final ConfigOption SINK_FILTER_EXPRESSION = ConfigOptions.key("sink.filter.expression") .stringType() .defaultValue(""); @@ -106,43 +85,39 @@ public class Configs { public static final ConfigOption SINK_HOS_TOKEN = ConfigOptions.key("sink.hos.token") .stringType() .defaultValue(""); + public static final ConfigOption SINK_HOS_BATCH_SIZE = ConfigOptions.key("sink.hos.batch.size") + .longType() + .defaultValue(0L); + public static final ConfigOption SINK_HOS_BATCH_INTERVAL_MS = ConfigOptions.key("sink.hos.batch.interval.ms") + .intType() + .defaultValue(0); - public static final ConfigOption SINK_HTTP_MAX_TOTAL = ConfigOptions.key("sink.http.max.total") + public static final ConfigOption SINK_HTTP_CLIENT_MAX_TOTAL = ConfigOptions.key("sink.http.client.max.total") .intType() - .defaultValue(2000); - public static final ConfigOption SINK_HTTP_MAX_PER_ROUTE = ConfigOptions.key("sink.http.max.per.route") + .defaultValue(100); + public static final ConfigOption SINK_HTTP_CLIENT_MAX_PER_ROUTE = ConfigOptions.key("sink.http.client.max.per.route") .intType() - .defaultValue(1000); - public static final ConfigOption SINK_HTTP_ERROR_RETRY = ConfigOptions.key("sink.http.error.retry") + .defaultValue(100); + public static final ConfigOption SINK_HTTP_CLIENT_RETRIES_NUMBER = ConfigOptions.key("sink.http.client.retries.number") .intType() .defaultValue(3); - public static final ConfigOption SINK_HTTP_CONNECT_TIMEOUT = ConfigOptions.key("sink.http.connect.timeout") + public static final ConfigOption SINK_HTTP_CLIENT_CONNECT_TIMEOUT_MS = ConfigOptions.key("sink.http.client.connect.timeout.ms") .intType() .defaultValue(10000); - public static final ConfigOption SINK_HTTP_REQUEST_TIMEOUT = ConfigOptions.key("sink.http.request.timeout") + public static final ConfigOption SINK_HTTP_CLIENT_REQUEST_TIMEOUT_MS = ConfigOptions.key("sink.http.client.request.timeout.ms") .intType() .defaultValue(10000); - public static final ConfigOption SINK_HTTP_SOCKET_TIMEOUT = ConfigOptions.key("sink.http.socket.timeout") + public static final ConfigOption SINK_HTTP_CLIENT_SOCKET_TIMEOUT_MS = ConfigOptions.key("sink.http.client.socket.timeout.ms") .intType() .defaultValue(60000); - public static final ConfigOption SINK_OSS_ASYNC = ConfigOptions.key("sink.oss.async") - .booleanType() - .defaultValue(false); - public static final ConfigOption SINK_OSS_ENDPOINT = ConfigOptions.key("sink.oss.endpoint") - .stringType() - .noDefaultValue(); - public static final ConfigOption SINK_OSS_FILTER_EXPRESSION = ConfigOptions.key("sink.oss.filter.expression") - .stringType() - .defaultValue(""); - public static final ConfigOption SINK_HBASE_ZOOKEEPER = ConfigOptions.key("sink.hbase.zookeeper") .stringType() - .defaultValue(""); + .defaultValue("127.0.0.1"); public static final ConfigOption SINK_HBASE_RETRIES_NUMBER = ConfigOptions.key("sink.hbase.retries.number") .intType() .defaultValue(10); - public static final ConfigOption SINK_HBASE_RPC_TIMEOUT = ConfigOptions.key("sink.hbase.rpc.timeout") + public static final ConfigOption SINK_HBASE_RPC_TIMEOUT_MS = ConfigOptions.key("sink.hbase.rpc.timeout.ms") .intType() .defaultValue(600000); public static final ConfigOption SINK_HBASE_CLIENT_WRITE_BUFFER = ConfigOptions.key("sink.hbase.client.write.buffer") @@ -150,18 +125,29 @@ public class Configs { .defaultValue(10485760); public static final ConfigOption SINK_HBASE_CLIENT_IPC_POOL_SIZE = ConfigOptions.key("sink.hbase.client.ipc.pool.size") .intType() - .defaultValue(1); - - public static final ConfigOption FILE_MAX_CHUNK_COUNT = ConfigOptions.key("file.max.chunk.count") + .defaultValue(3); + public static final ConfigOption SINK_HBASE_BATCH_SIZE = ConfigOptions.key("sink.hbase.batch.size") + .longType() + .defaultValue(0L); + public static final ConfigOption SINK_HBASE_BATCH_INTERVAL_MS = ConfigOptions.key("sink.hbase.batch.interval.ms") .intType() - .defaultValue(100000); - public static final ConfigOption MAP_FILTER_EXPRESSION = ConfigOptions.key("map.filter.expression") + .defaultValue(0); + + public static final ConfigOption SINK_OSS_ENDPOINT = ConfigOptions.key("sink.oss.endpoint") + .stringType() + .noDefaultValue(); + public static final ConfigOption SINK_OSS_FILTER_EXPRESSION = ConfigOptions.key("sink.oss.filter.expression") .stringType() .defaultValue(""); + public static final ConfigOption SINK_OSS_CACHE_TIME_MS = ConfigOptions.key("sink.oss.cache.time.ms") + .longType() + .defaultValue(0L); + public static final ConfigOption SINK_OSS_CACHE_SIZE = ConfigOptions.key("sink.oss.cache.size") + .longType() + .defaultValue(0L); public static final ConfigOption FILE_META_FILTER_EXPRESSION = ConfigOptions.key("file.meta.filter.expression") .stringType() .defaultValue(""); - public static final ConfigOption KAFKA_FILE_META_SESSION_TOPIC = ConfigOptions.key("source.kafka.file.meta.session.topic") .stringType() .noDefaultValue(); @@ -171,10 +157,4 @@ public class Configs { public static final ConfigOption KAFKA_FILE_META_GROUP_ID = ConfigOptions.key("source.kafka.file.meta.group.id") .stringType() .defaultValue("file_chunk_combine_1"); - public static final ConfigOption FILE_META_CACHE_TIME = ConfigOptions.key("file.meta.cache.time") - .longType() - .defaultValue(0L); - public static final ConfigOption FILE_META_CACHE_SIZE = ConfigOptions.key("file.meta.cache.size") - .longType() - .defaultValue(0L); } diff --git a/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java b/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java index fd1ca2b..736758d 100644 --- a/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java +++ b/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java @@ -36,11 +36,6 @@ public class CombineChunkProcessWindowFunction extends ProcessWindowFunction fileMaxChunkCount) { - break; - } } if (waitingToCombineChunkList.size() > 0) { FileChunk fileChunk = combineChunk(waitingToCombineChunkList, originalFileChunkList.get(0).getUuid(), originalFileChunkList.get(0).getFileName(), originalFileChunkList.get(0).getFileType(), 0, "append", 0, originalFileChunkList.get(0).getMeta(), startTimestamp, timestampAndSizes.toString()); diff --git a/src/main/java/com/zdjizhi/function/map/ParseMessagePackMapFunction.java b/src/main/java/com/zdjizhi/function/map/ParseMessagePackMapFunction.java index 06c1e5c..be556a4 100644 --- a/src/main/java/com/zdjizhi/function/map/ParseMessagePackMapFunction.java +++ b/src/main/java/com/zdjizhi/function/map/ParseMessagePackMapFunction.java @@ -188,9 +188,8 @@ public class ParseMessagePackMapFunction extends RichMapFunction metaMap = new HashMap<>(); for (int i = 0; i < numFields; i++) { diff --git a/src/main/java/com/zdjizhi/function/map/SideOutputMapFunction.java b/src/main/java/com/zdjizhi/function/map/SideOutputMapFunction.java deleted file mode 100644 index 58236fc..0000000 --- a/src/main/java/com/zdjizhi/function/map/SideOutputMapFunction.java +++ /dev/null @@ -1,33 +0,0 @@ -package com.zdjizhi.function.map; - -import com.zdjizhi.pojo.FileChunk; -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.metrics.Counter; -import org.apache.flink.metrics.MeterView; -import org.apache.flink.metrics.MetricGroup; - -import static com.zdjizhi.utils.PublicConstants.COMBINE_MODE_APPEND; - -public class SideOutputMapFunction extends RichMapFunction { - - public transient Counter delayedChunksCounter; - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - MetricGroup metricGroup = getRuntimeContext().getMetricGroup().addGroup("file_chunk_combiner", "side_out_put"); - delayedChunksCounter = metricGroup.counter("delayedChunksCount"); - metricGroup.meter("numChunksDelayPerSecond", new MeterView(delayedChunksCounter)); - } - - @Override - public FileChunk map(FileChunk fileChunk) { - delayedChunksCounter.inc(); - fileChunk.setChunkCount(1); - if (COMBINE_MODE_APPEND.equals(fileChunk.getCombineMode())) { - fileChunk.setChunkNumbers(fileChunk.getTimestamp() + "-" + fileChunk.getChunk().length + ";"); - } - return fileChunk; - } -} diff --git a/src/main/java/com/zdjizhi/sink/HBaseSink.java b/src/main/java/com/zdjizhi/sink/HBaseSink.java index ffcef6e..3ba9d15 100644 --- a/src/main/java/com/zdjizhi/sink/HBaseSink.java +++ b/src/main/java/com/zdjizhi/sink/HBaseSink.java @@ -75,8 +75,8 @@ public class HBaseSink extends RichSinkFunction { private List indexTimePutList; private List indexFilenamePutList; private long chunkSize; - private long maxBatchSize; - private long maxBatchCount; + private long batchSize; + private long batchInterval; private ScheduledExecutorService executorService; private long rateLimitThreshold; private String rateLimitExpression; @@ -162,24 +162,21 @@ public class HBaseSink extends RichSinkFunction { indexFilenameTable = syncHBaseConnection.getTable(TableName.valueOf("default:index_filename_" + configuration.get(Configs.SINK_HOS_BUCKET))); } timestamp = System.currentTimeMillis(); - if (configuration.get(Configs.SINK_BATCH)) { - maxBatchSize = configuration.getLong(Configs.SINK_BATCH_SIZE); - maxBatchCount = configuration.getInteger(Configs.SINK_BATCH_COUNT); - dataPutList = new ArrayList<>(); - indexTimePutList = new ArrayList<>(); - indexFilenamePutList = new ArrayList<>(); + batchSize = configuration.getLong(Configs.SINK_HBASE_BATCH_SIZE); + batchInterval = configuration.getInteger(Configs.SINK_HBASE_BATCH_INTERVAL_MS); + dataPutList = new ArrayList<>(); + indexTimePutList = new ArrayList<>(); + indexFilenamePutList = new ArrayList<>(); + if (batchSize > 0 && batchInterval > 0) { chunkSize = 0; executorService = Executors.newScheduledThreadPool(1); - long period = configuration.getInteger(Configs.SINK_BATCH_TIME); executorService.scheduleWithFixedDelay(() -> { - if (System.currentTimeMillis() - timestamp > (period * 1000)) { + synchronized (this) { if (!dataPutList.isEmpty()) { - synchronized (this) { - sendBatchData(); - } + sendDataToHbase(); } } - }, period, period, TimeUnit.SECONDS); + }, batchInterval, batchInterval, TimeUnit.MILLISECONDS); } if (rateLimitThreshold > 0) { rateLimitThreshold = configuration.getLong(Configs.SINK_RATE_LIMIT_THRESHOLD); @@ -217,7 +214,6 @@ public class HBaseSink extends RichSinkFunction { sendFileChunk(fileChunk); } } else { - timestamp = currentTimeMillis; sendFileChunk(fileChunk); } } @@ -236,7 +232,7 @@ public class HBaseSink extends RichSinkFunction { } private void sendFileChunk(FileChunk fileChunk) { - if (COMBINE_MODE_SEEK.equals(fileChunk.getCombineMode()) && configuration.get(Configs.SINK_BATCH)) { + if (COMBINE_MODE_SEEK.equals(fileChunk.getCombineMode())) { byte[] data = "".getBytes(); if (fileChunk.getChunk() != null) { data = fileChunk.getChunk(); @@ -282,13 +278,17 @@ public class HBaseSink extends RichSinkFunction { chunksOutCounter.inc(); bytesOutCounter.inc(chunkLength); calculateFileChunkMetrics(fileChunk); - if (chunkSize >= maxBatchSize || dataPutList.size() >= maxBatchCount) { - sendBatchData(); + if (batchSize > 0 && batchInterval > 0) { + if (chunkSize >= batchSize) { + sendDataToHbase(); + } + } else { + sendDataToHbase(); } } } - private void sendBatchData() { + private void sendDataToHbase() { if (isAsync) { List> futures = asyncTable.batch(dataPutList); CompletableFuture.supplyAsync(() -> { @@ -303,18 +303,27 @@ public class HBaseSink extends RichSinkFunction { return null; }); dataPutList.clear(); - asyncIndexTimeTable.batch(indexTimePutList); - indexTimePutList.clear(); - asyncIndexFilenameTable.batch(indexFilenamePutList); - indexFilenamePutList.clear(); + if (indexTimePutList.size() > 0) { + asyncIndexTimeTable.batch(indexTimePutList); + indexTimePutList.clear(); + } + if (indexFilenamePutList.size() > 0) { + asyncIndexFilenameTable.batch(indexFilenamePutList); + indexFilenamePutList.clear(); + } } else { try { table.batch(dataPutList, null); - indexTimeTable.batch(indexTimePutList, null); - indexFilenameTable.batch(indexFilenamePutList, null); + if (indexTimePutList.size() > 0) { + indexTimeTable.batch(indexTimePutList, null); + } + if (indexFilenamePutList.size() > 0) { + indexFilenameTable.batch(indexFilenamePutList, null); + } } catch (IOException | InterruptedException e) { LOG.error("Put chunk to hbase error. ", e.getMessage()); errorChunksCounter.inc(dataPutList.size()); + Thread.currentThread().interrupt(); } finally { dataPutList.clear(); indexTimePutList.clear(); diff --git a/src/main/java/com/zdjizhi/sink/HosSink.java b/src/main/java/com/zdjizhi/sink/HosSink.java index b79794e..d34203f 100644 --- a/src/main/java/com/zdjizhi/sink/HosSink.java +++ b/src/main/java/com/zdjizhi/sink/HosSink.java @@ -79,8 +79,8 @@ public class HosSink extends RichSinkFunction { private String objectsMeta; private String objectsOffset; private List byteList; - private long maxBatchSize; - private long maxBatchCount; + private long batchSize; + private long batchInterval; private long chunkSize; private ScheduledExecutorService executorService; private long rateLimitThreshold; @@ -171,26 +171,23 @@ public class HosSink extends RichSinkFunction { syncHttpClient = HttpClientUtil.getInstance(configuration).getSyncHttpClient(); } timestamp = System.currentTimeMillis(); - if (configuration.get(Configs.SINK_BATCH)) { + batchSize = configuration.getLong(Configs.SINK_HOS_BATCH_SIZE); + batchInterval = configuration.getInteger(Configs.SINK_HOS_BATCH_INTERVAL_MS); + if (batchSize > 0 && batchInterval > 0) { bathPutUrl = URLUtil.normalize(endpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + PublicUtil.getUUID()) + "?multiFile"; - maxBatchSize = configuration.getLong(Configs.SINK_BATCH_SIZE); - maxBatchCount = configuration.getInteger(Configs.SINK_BATCH_COUNT); hosMessage = new HashMap<>(); byteList = new ArrayList<>(); objectsMeta = ""; objectsOffset = ""; chunkSize = 0; executorService = Executors.newScheduledThreadPool(1); - long period = configuration.getInteger(Configs.SINK_BATCH_TIME); executorService.scheduleWithFixedDelay(() -> { - if (System.currentTimeMillis() - timestamp > (period * 1000)) { + synchronized (this) { if (!byteList.isEmpty()) { - synchronized (this) { - sendBatchData(); - } + sendBatchData(); } } - }, period, period, TimeUnit.SECONDS); + }, batchInterval, batchInterval, TimeUnit.MILLISECONDS); } if (rateLimitThreshold > 0) { rateLimitThreshold = configuration.getLong(Configs.SINK_RATE_LIMIT_THRESHOLD); @@ -228,7 +225,6 @@ public class HosSink extends RichSinkFunction { sendFileChunk(fileChunk); } } else { - timestamp = currentTimeMillis; sendFileChunk(fileChunk); } } @@ -250,7 +246,7 @@ public class HosSink extends RichSinkFunction { data = fileChunk.getChunk(); } long chunkLength = data.length; - if (configuration.get(Configs.SINK_BATCH)) { + if (batchSize > 0 && batchInterval > 0) { hosMessage.put(HOS_META_FILE_TYPE, fileChunk.getFileType()); hosMessage.put(HOS_META_FILENAME, fileChunk.getUuid()); if (COMBINE_MODE_SEEK.equals(fileChunk.getCombineMode())) { @@ -275,7 +271,7 @@ public class HosSink extends RichSinkFunction { chunksOutCounter.inc(); bytesOutCounter.inc(chunkLength); calculateFileChunkMetrics(fileChunk); - if (chunkSize >= maxBatchSize || byteList.size() >= maxBatchCount) { + if (chunkSize >= batchSize) { sendBatchData(); } } else { diff --git a/src/main/java/com/zdjizhi/sink/OssSinkByCaffeineCache.java b/src/main/java/com/zdjizhi/sink/OssSinkByCaffeineCache.java index 7d61967..0ef9785 100644 --- a/src/main/java/com/zdjizhi/sink/OssSinkByCaffeineCache.java +++ b/src/main/java/com/zdjizhi/sink/OssSinkByCaffeineCache.java @@ -84,14 +84,14 @@ public class OssSinkByCaffeineCache extends RichSinkFunction { super.open(parameters); MetricGroup metricGroup = getRuntimeContext().getMetricGroup().addGroup("file_chunk_combiner", "sink_oss"); endpointList = Arrays.asList(configuration.get(Configs.SINK_OSS_ENDPOINT).split(",")); - isAsync = configuration.getBoolean(Configs.SINK_OSS_ASYNC); + isAsync = configuration.getBoolean(Configs.SINK_ASYNC); if (isAsync) { asyncHttpClient = HttpClientUtil.getInstance(configuration).getAsyncHttpClient(); asyncHttpClient.start(); } else { syncHttpClient = HttpClientUtil.getInstance(configuration).getSyncHttpClient(); } - cache = CaffeineCacheUtil.getInstance(configuration).getCaffeineCache(); + cache = CaffeineCacheUtil.getInstance(configuration.getLong(Configs.SINK_OSS_CACHE_SIZE), configuration.getLong(Configs.SINK_OSS_CACHE_TIME_MS)).getCaffeineCache(); metricGroup.gauge("cacheLength", (Gauge) () -> cache.estimatedSize()); lessThan1KBChunksCounter = metricGroup.counter("lessThan1KBChunksCount"); between1KBAnd5KBChunksCounter = metricGroup.counter("between1KBAnd5KBChunksCount"); diff --git a/src/main/java/com/zdjizhi/utils/CaffeineCacheUtil.java b/src/main/java/com/zdjizhi/utils/CaffeineCacheUtil.java index 75c946e..9b499dd 100644 --- a/src/main/java/com/zdjizhi/utils/CaffeineCacheUtil.java +++ b/src/main/java/com/zdjizhi/utils/CaffeineCacheUtil.java @@ -3,9 +3,7 @@ package com.zdjizhi.utils; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.Weigher; -import com.zdjizhi.config.Configs; import com.zdjizhi.pojo.FileChunk; -import org.apache.flink.configuration.Configuration; import org.checkerframework.checker.index.qual.NonNegative; import org.checkerframework.checker.nullness.qual.NonNull; @@ -15,23 +13,23 @@ public class CaffeineCacheUtil { private static CaffeineCacheUtil caffeineCacheUtil = null; private static Cache caffeineCache = null; - private CaffeineCacheUtil(Configuration configuration) { + private CaffeineCacheUtil(Long cacheSize, Long cacheTimeMs) { caffeineCache = Caffeine.newBuilder() -// .initialCapacity(configuration.getLong(Configs.FILE_META_CACHE_SIZE)) - .maximumWeight(configuration.getLong(Configs.FILE_META_CACHE_SIZE)) +// .initialCapacity(configuration.getLong(Configs.SINK_OSS_CACHE_SIZE)) + .maximumWeight(cacheSize) .weigher(new Weigher() { @Override public @NonNegative int weigh(@NonNull String key, @NonNull FileChunk value) { return (int) value.getLength(); } }) - .expireAfterWrite(configuration.get(Configs.FILE_META_CACHE_TIME), TimeUnit.SECONDS) + .expireAfterWrite(cacheTimeMs, TimeUnit.MILLISECONDS) .build(); } - public static synchronized CaffeineCacheUtil getInstance(Configuration configuration) { + public static synchronized CaffeineCacheUtil getInstance(Long cacheSize, Long cacheTimeMs) { if (null == caffeineCacheUtil) { - caffeineCacheUtil = new CaffeineCacheUtil(configuration); + caffeineCacheUtil = new CaffeineCacheUtil(cacheSize, cacheTimeMs); } return caffeineCacheUtil; } diff --git a/src/main/java/com/zdjizhi/utils/HBaseConnectionUtil.java b/src/main/java/com/zdjizhi/utils/HBaseConnectionUtil.java index 03d76b6..66ca03b 100644 --- a/src/main/java/com/zdjizhi/utils/HBaseConnectionUtil.java +++ b/src/main/java/com/zdjizhi/utils/HBaseConnectionUtil.java @@ -22,8 +22,8 @@ public class HBaseConnectionUtil { hbaseConfiguration.set(HConstants.ZOOKEEPER_QUORUM, configuration.getString(Configs.SINK_HBASE_ZOOKEEPER)); hbaseConfiguration.set(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT, "2181"); hbaseConfiguration.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase"); - hbaseConfiguration.set(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, configuration.get(Configs.SINK_HBASE_RETRIES_NUMBER) + ""); - hbaseConfiguration.set(HConstants.HBASE_RPC_TIMEOUT_KEY, configuration.get(Configs.SINK_HBASE_RPC_TIMEOUT) + ""); + hbaseConfiguration.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER, configuration.get(Configs.SINK_HBASE_RETRIES_NUMBER) + ""); + hbaseConfiguration.set(HConstants.HBASE_RPC_TIMEOUT_KEY, configuration.get(Configs.SINK_HBASE_RPC_TIMEOUT_MS) + ""); hbaseConfiguration.set(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, "1073741800"); hbaseConfiguration.set(ConnectionConfiguration.WRITE_BUFFER_SIZE_KEY, configuration.get(Configs.SINK_HBASE_CLIENT_WRITE_BUFFER) + ""); hbaseConfiguration.set(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, configuration.get(Configs.SINK_HBASE_CLIENT_IPC_POOL_SIZE) + ""); @@ -51,6 +51,7 @@ public class HBaseConnectionUtil { try { asyncHBaseConnection = ConnectionFactory.createAsyncConnection(hbaseConfiguration).get(); } catch (ExecutionException | InterruptedException e) { + Thread.currentThread().interrupt(); throw new RuntimeException(e.getMessage()); } return asyncHBaseConnection; diff --git a/src/main/java/com/zdjizhi/utils/HttpClientUtil.java b/src/main/java/com/zdjizhi/utils/HttpClientUtil.java index cafd193..5df9b87 100644 --- a/src/main/java/com/zdjizhi/utils/HttpClientUtil.java +++ b/src/main/java/com/zdjizhi/utils/HttpClientUtil.java @@ -44,15 +44,15 @@ public class HttpClientUtil { private RequestConfig getRequestConfig() { return RequestConfig.custom() - .setConnectTimeout(configuration.get(Configs.SINK_HTTP_CONNECT_TIMEOUT)) - .setConnectionRequestTimeout(configuration.get(Configs.SINK_HTTP_REQUEST_TIMEOUT)) - .setSocketTimeout(configuration.get(Configs.SINK_HTTP_SOCKET_TIMEOUT)) + .setConnectTimeout(configuration.get(Configs.SINK_HTTP_CLIENT_CONNECT_TIMEOUT_MS)) + .setConnectionRequestTimeout(configuration.get(Configs.SINK_HTTP_CLIENT_REQUEST_TIMEOUT_MS)) + .setSocketTimeout(configuration.get(Configs.SINK_HTTP_CLIENT_SOCKET_TIMEOUT_MS)) .build(); } private HttpRequestRetryHandler getRetryHandler() { return (exception, executionCount, context) -> { - if (executionCount >= configuration.get(Configs.SINK_HTTP_ERROR_RETRY)) { + if (executionCount >= configuration.get(Configs.SINK_HTTP_CLIENT_RETRIES_NUMBER)) { return false; } if (exception instanceof NoHttpResponseException) {// 如果服务器丢掉了连接,那么就重试 @@ -106,9 +106,9 @@ public class HttpClientUtil { // 创建ConnectionManager,添加Connection配置信息 connManager = new PoolingHttpClientConnectionManager(socketFactoryRegistry); // 设置最大连接数 - connManager.setMaxTotal(configuration.get(Configs.SINK_HTTP_MAX_TOTAL)); + connManager.setMaxTotal(configuration.get(Configs.SINK_HTTP_CLIENT_MAX_TOTAL)); // 设置每个连接的路由数 - connManager.setDefaultMaxPerRoute(configuration.get(Configs.SINK_HTTP_MAX_PER_ROUTE)); + connManager.setDefaultMaxPerRoute(configuration.get(Configs.SINK_HTTP_CLIENT_MAX_PER_ROUTE)); } catch (KeyManagementException | NoSuchAlgorithmException e) { throw new RuntimeException(e.getMessage()); } @@ -146,8 +146,8 @@ public class HttpClientUtil { .build(); ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig); connManager = new PoolingNHttpClientConnectionManager(ioReactor); - connManager.setMaxTotal(configuration.get(Configs.SINK_HTTP_MAX_TOTAL)); - connManager.setDefaultMaxPerRoute(configuration.get(Configs.SINK_HTTP_MAX_PER_ROUTE)); + connManager.setMaxTotal(configuration.get(Configs.SINK_HTTP_CLIENT_MAX_TOTAL)); + connManager.setDefaultMaxPerRoute(configuration.get(Configs.SINK_HTTP_CLIENT_MAX_PER_ROUTE)); } catch (IOReactorException e) { throw new RuntimeException(e.getMessage()); } diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties index b710c00..cf989b9 100644 --- a/src/main/resources/common.properties +++ b/src/main/resources/common.properties @@ -1,70 +1,76 @@ flink.job.name=agg_traffic_file_chunk_combine -#9092Ϊ֤ 9095Ϊssl 9094Ϊsasl -source.kafka.broker=192.168.41.29:9092 + +#kafka source配置 +#9092为无验证 9095为ssl 9094为sasl +source.kafka.broker=192.168.44.12:9092 source.kafka.topic=TRAFFIC-FILE-STREAM-RECORD -source.kafka.group.id=test1 -#earliestͷʼ latest -source.kafka.auto.offset.reset=earliest +source.kafka.group.id=test +#earliest从头开始 latest最新 +source.kafka.auto.offset.reset=latest source.kafka.session.timeout.ms=60000 -#ÿȡӷлȡ¼ +#每次拉取操作从分区中获取的最大记录数 source.kafka.max.poll.records=1000 -#ߴӵһԻȡֽ +#消费者从单个分区中一次性获取的最大字节数 source.kafka.max.partition.fetch.bytes=31457280 source.kafka.enable.auto.commit=true -#kafka SASL֤û +#kafka SASL验证用户名 source.kafka.user=admin -#kafka SASLSSL֤ +#kafka SASL及SSL验证密码 source.kafka.pin=galaxy2019 -#SSLҪ -source.kafka.tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\ -source.kafka.file.meta.session.topic=SESSION-RECORD -source.kafka.file.meta.proxy.topic=PROXY-EVENT -source.kafka.file.meta.group.id=file_chunk_combine_1 -map.filter.expression=FileChunk.fileType == "eml" || (FileChunk.offset <= 10737 && FileChunk.fileType != "eml") -map.parse.file.meta.parallelism=1 -# -combiner.window.type=0 +#SSL需要 +source.kafka.tools.library=/opt/tsg/olap/topology/data/ + +map.filter.expression=FileChunk.offset <= 1073741824 + +#窗口相关配置 combiner.window.parallelism=1 -combiner.window.time=10 -combiner.window.allowed.lateness=10 -#೤ʱδд򴥷 -combiner.window.idle.time=10 -combiner.window.enable.last.chunk.trigger=true -file.max.chunk.count=100000 -file.meta.cache.time=30 -file.meta.cache.size=1073741824 -#file.meta.filter.expression=(FileChunk.meta.sled_ip == "172.18.10.168" && FileChunk.meta.fileId.contains("_9")) || (FileChunk.meta.sled_ip == "172.18.10.168" && FileChunk.meta.duration_ms < 60 && FileChunk.meta.contentLength < 1048576) +#窗口大小,单位秒 +combiner.window.size=30 +#combiner.window.enable.last.chunk.trigger=true + +#sink相关参数 sink.parallelism=1 -#ѡhososshbase +#可选hos、oss、hbase sink.type=hos -sink.async=false -sink.batch=true -sink.batch.count=1000 -sink.batch.size=1048576 -sink.batch.time=10 +sink.async=true #sink.filter.expression= +#限流配置 #sink.rate.limit.threshold=0 #sink.rate.limit.exclusion.expression=FileChunk.fileType == "eml" -#hos sink -#nginx򵥸hosΪip:portʶhosΪip1:port,ip2:port... -sink.hos.endpoint=192.168.41.29:8186 + +#hos sink相关配置 +#访问nginx或单个hos配置为ip:port;访问多个hos,配置为ip1:port,ip2:port... +sink.hos.endpoint=192.168.44.12:8186 sink.hos.bucket=traffic_file_bucket sink.hos.token=c21f969b5f03d33d43e04f8f136e7682 -#oss sink -#Զַip1:port,ip2:port... -sink.oss.endpoint=192.168.41.29:8186 -#sink.oss.filter.expression=FileChunk.offset == 0 && FileChunk.lastChunkFlag == 1 -sink.oss.async=false -#http -sink.http.error.retry=3 -sink.http.max.total=10 -sink.http.max.per.route=10 -sink.http.connect.timeout=1000 -sink.http.request.timeout=5000 -sink.http.socket.timeout=60000 -#hbase sink -sink.hbase.zookeeper=192.168.41.29 +#sink.hos.batch.size=1048576 +#sink.hos.batch.interval.ms=10000 + +#http相关配置 +sink.http.client.retries.number=3 +sink.http.client.max.total=100 +sink.http.client.max.per.route=100 +sink.http.client.connect.timeout.ms=10000 +sink.http.client.request.timeout.ms=10000 +sink.http.client.socket.timeout.ms=60000 + +#hbase sink相关配置 +sink.hbase.zookeeper=192.168.44.12 sink.hbase.retries.number=10 -sink.hbase.rpc.timeout=600000 +sink.hbase.rpc.timeout.ms=600000 sink.hbase.client.write.buffer=10971520 -sink.hbase.client.ipc.pool.size=3 \ No newline at end of file +sink.hbase.client.ipc.pool.size=3 +sink.hbase.batch.size=1048576 +sink.hbase.batch.interval.ms=10000 + +#oss sink相关配置 +#可以多个地址,ip1:port,ip2:port... +#sink.oss.endpoint=192.168.44.12:8186 +#sink.oss.filter.expression=FileChunk.offset == 0 && FileChunk.lastChunkFlag == 1 +#sink.oss.cache.time.ms=30000 +#sink.oss.cache.size=1073741824 +#文件关联元信息相关配置 +#source.kafka.file.meta.session.topic=SESSION-RECORD +#source.kafka.file.meta.proxy.topic=PROXY-EVENT +#source.kafka.file.meta.group.id=file_chunk_combine_1 +#file.meta.filter.expression=FileChunk.meta.fileId.contains("_9") \ No newline at end of file diff --git a/src/test/java/com/zdjizhi/FileChunkCombinerTests.java b/src/test/java/com/zdjizhi/FileChunkCombinerTests.java index 959db5c..b2df5cf 100644 --- a/src/test/java/com/zdjizhi/FileChunkCombinerTests.java +++ b/src/test/java/com/zdjizhi/FileChunkCombinerTests.java @@ -6,18 +6,14 @@ import cn.hutool.core.util.RandomUtil; import com.zdjizhi.config.Configs; import com.zdjizhi.function.*; import com.zdjizhi.function.map.ParseMessagePackMapFunction; -import com.zdjizhi.function.map.SideOutputMapFunction; import com.zdjizhi.pojo.FileChunk; import com.zdjizhi.sink.HBaseSink; import com.zdjizhi.sink.HosSink; import com.zdjizhi.trigger.LastChunkOrNoDataInTimeTrigger; +import com.zdjizhi.trigger.LastChunkTrigger; import com.zdjizhi.trigger.MultipleTrigger; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.api.common.functions.RuntimeContext; +import com.zdjizhi.utils.PublicUtil; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; @@ -26,39 +22,30 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.operators.*; import org.apache.flink.streaming.api.transformations.OneInputTransformation; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; +import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator; -import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction; -import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.TestHarnessUtil; -import org.apache.flink.streaming.util.functions.StreamingFunctionUtils; import org.apache.flink.test.util.MiniClusterWithClientResource; -import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; import org.junit.*; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import java.io.*; import java.nio.file.Files; import java.nio.file.Paths; -import java.time.Duration; import java.util.*; import java.util.concurrent.ConcurrentLinkedQueue; -import static com.zdjizhi.utils.PublicConstants.COMBINE_MODE_APPEND; - public class FileChunkCombinerTests { private File emlFile; private byte[] emlFileBytes; @@ -111,7 +98,7 @@ public class FileChunkCombinerTests { triggers.add(EventTimeTrigger.create()); triggers.add(LastChunkOrNoDataInTimeTrigger.of(1000)); Trigger trigger = MultipleTrigger.of(triggers); - processWindowFunction = new CombineChunkProcessWindowFunction(Integer.MAX_VALUE); + processWindowFunction = new CombineChunkProcessWindowFunction(); delayedChunkOutputTag = new OutputTag("delayed-chunk") { }; DataStreamSource source = env.fromCollection(inputFileChunks); @@ -165,34 +152,6 @@ public class FileChunkCombinerTests { testHarness.close(); } - @Test - public void testSideOutputMapFunction() throws Exception { - SideOutputMapFunction sideOutputMapFunction = new SideOutputMapFunction(); - OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness<>(new StreamMap<>(sideOutputMapFunction)); - testHarness.setup(); - testHarness.open(); - for (FileChunk fileChunk : inputFileChunks) { - testHarness.processElement(new StreamRecord<>(fileChunk)); - } - ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); - for (FileChunk fileChunk : inputFileChunks) { - fileChunk.setChunkCount(1); - if (COMBINE_MODE_APPEND.equals(fileChunk.getCombineMode())) { - fileChunk.setChunkNumbers(fileChunk.getTimestamp() + "-" + fileChunk.getChunk().length + ";"); - } - expectedOutput.add(new StreamRecord<>(fileChunk)); - } - ConcurrentLinkedQueue actualOutput = testHarness.getOutput(); - Assert.assertEquals(30, actualOutput.size()); - TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, actualOutput, (o1, o2) -> { - StreamRecord sr0 = (StreamRecord) o1; - StreamRecord sr1 = (StreamRecord) o2; - return ((FileChunk) sr0.getValue()).getUuid().compareTo(((FileChunk) sr1.getValue()).getUuid()); - }); - Assert.assertEquals(30, sideOutputMapFunction.delayedChunksCounter.getCount()); - testHarness.close(); - } - @Test public void testFileChunkFilterFunction() throws Exception { FileChunkFilterFunction fileChunkFilterFunction = new FileChunkFilterFunction("FileChunk.fileType == \"eml\"", "test"); @@ -331,11 +290,13 @@ public class FileChunkCombinerTests { testHarness.close(); } + //测试hos sink,需配置可用的hos地址 @Test public void testHosSink() throws Exception { //测试单条上传 configuration.setString(Configs.SINK_TYPE, "hos"); - configuration.setBoolean(Configs.SINK_BATCH, false); + configuration.setLong(Configs.SINK_HOS_BATCH_SIZE, 0L); + configuration.setInteger(Configs.SINK_HOS_BATCH_INTERVAL_MS, 0); HosSink hosSink = new HosSink(configuration); StreamSink fileChunkStreamSink = new StreamSink<>(hosSink); OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness<>(fileChunkStreamSink); @@ -343,7 +304,7 @@ public class FileChunkCombinerTests { testHarness.open(); byte[] data = RandomUtil.randomString(1000).getBytes(); //seek文件 - FileChunk fileChunk = new FileChunk("0000000001", "eml", 0, data.length, data, "seek", 1, 5, System.currentTimeMillis() * 1000); + FileChunk fileChunk = new FileChunk(PublicUtil.getUUID(), "eml", 0, data.length, data, "seek", 1, 5, System.currentTimeMillis() * 1000); testHarness.processElement(new StreamRecord<>(fileChunk)); Assert.assertEquals(1, hosSink.chunksInCounter.getCount()); Assert.assertEquals(1, hosSink.chunksOutCounter.getCount()); @@ -356,7 +317,7 @@ public class FileChunkCombinerTests { Assert.assertEquals(0, hosSink.between100KBAnd1MBChunksCounter.getCount()); Assert.assertEquals(0, hosSink.greaterThan1MBChunksCounter.getCount()); //append文件 - fileChunk = new FileChunk("0000000002", "pcapng", data.length, data, "append", 5, System.currentTimeMillis() * 1000, pcapngFileMeta, "1-200,2-200,3-200,4-200,5-200"); + fileChunk = new FileChunk(PublicUtil.getUUID(), "pcapng", data.length, data, "append", 5, System.currentTimeMillis() * 1000, pcapngFileMeta, "1-200,2-200,3-200,4-200,5-200"); testHarness.processElement(new StreamRecord<>(fileChunk)); Assert.assertEquals(2, hosSink.chunksInCounter.getCount()); Assert.assertEquals(2, hosSink.chunksOutCounter.getCount()); @@ -373,19 +334,18 @@ public class FileChunkCombinerTests { //测试批量上传 data = RandomUtil.randomString(10000).getBytes(); configuration.setString(Configs.SINK_TYPE, "hos"); - configuration.setBoolean(Configs.SINK_BATCH, true); - configuration.setInteger(Configs.SINK_BATCH_COUNT, 10); - configuration.setInteger(Configs.SINK_BATCH_TIME, 2); + configuration.setLong(Configs.SINK_HOS_BATCH_SIZE, 1024*1024L); + configuration.setInteger(Configs.SINK_HOS_BATCH_INTERVAL_MS, 2000); hosSink = new HosSink(configuration); fileChunkStreamSink = new StreamSink<>(hosSink); testHarness = new OneInputStreamOperatorTestHarness<>(fileChunkStreamSink); testHarness.setup(); testHarness.open(); - fileChunk = new FileChunk("0000000001", "eml", 0, data.length, data, "seek", 1, 5, System.currentTimeMillis() * 1000); + fileChunk = new FileChunk(PublicUtil.getUUID(), "eml", 0, data.length, data, "seek", 1, 5, System.currentTimeMillis() * 1000); testHarness.processElement(new StreamRecord<>(fileChunk)); - fileChunk = new FileChunk("0000000002", "eml", 0, data.length, data, "seek", 1, 5, System.currentTimeMillis() * 1000); + fileChunk = new FileChunk(PublicUtil.getUUID(), "eml", 0, data.length, data, "seek", 1, 5, System.currentTimeMillis() * 1000); testHarness.processElement(new StreamRecord<>(fileChunk)); - Thread.sleep(configuration.getInteger(Configs.SINK_BATCH_TIME) * 1000L + 1000); + Thread.sleep(configuration.getInteger(Configs.SINK_HOS_BATCH_INTERVAL_MS) + 1000); Assert.assertEquals(2, hosSink.chunksInCounter.getCount()); Assert.assertEquals(2, hosSink.chunksOutCounter.getCount()); Assert.assertEquals(0, hosSink.errorChunksCounter.getCount()); @@ -399,23 +359,48 @@ public class FileChunkCombinerTests { testHarness.close(); } + //测试hbase sink,需配置可用的hbase地址 @Test public void testHBaseSink() throws Exception { + //测试单条上传 configuration.setString(Configs.SINK_TYPE, "hbase"); - configuration.setBoolean(Configs.SINK_BATCH, true); - configuration.setInteger(Configs.SINK_BATCH_COUNT, 10); - configuration.setInteger(Configs.SINK_BATCH_TIME, 2); + configuration.setLong(Configs.SINK_HBASE_BATCH_SIZE, 0L); + configuration.setInteger(Configs.SINK_HBASE_BATCH_INTERVAL_MS, 0); HBaseSink hBaseSink = new HBaseSink(configuration); StreamSink fileChunkStreamSink = new StreamSink<>(hBaseSink); OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness<>(fileChunkStreamSink); testHarness.setup(); testHarness.open(); byte[] data = RandomUtil.randomString(1000).getBytes(); - FileChunk fileChunk = new FileChunk("0000000001", "eml", 0, data.length, data, "seek", 1, 5, System.currentTimeMillis()); + FileChunk fileChunk = new FileChunk(PublicUtil.getUUID(), "eml", 0, data.length, data, "seek", 1, 5, System.currentTimeMillis()); testHarness.processElement(new StreamRecord<>(fileChunk)); - fileChunk = new FileChunk("0000000002", "eml", 0, data.length, data, "seek", 1, 5, System.currentTimeMillis()); + Assert.assertEquals(1, hBaseSink.chunksInCounter.getCount()); + Assert.assertEquals(1, hBaseSink.chunksOutCounter.getCount()); + Assert.assertEquals(0, hBaseSink.errorChunksCounter.getCount()); + Assert.assertEquals(1, hBaseSink.filesCounter.getCount()); + Assert.assertEquals(1, hBaseSink.lessThan1KBChunksCounter.getCount()); + Assert.assertEquals(0, hBaseSink.between1KBAnd5KBChunksCounter.getCount()); + Assert.assertEquals(0, hBaseSink.between5KBAnd10KBChunksCounter.getCount()); + Assert.assertEquals(0, hBaseSink.between10KBAnd100KBChunksCounter.getCount()); + Assert.assertEquals(0, hBaseSink.between100KBAnd1MBChunksCounter.getCount()); + Assert.assertEquals(0, hBaseSink.greaterThan1MBChunksCounter.getCount()); + testHarness.close(); + + //测试批量上传 + configuration.setString(Configs.SINK_TYPE, "hbase"); + configuration.setLong(Configs.SINK_HBASE_BATCH_SIZE, 1024*1024L); + configuration.setInteger(Configs.SINK_HBASE_BATCH_INTERVAL_MS, 2000); + hBaseSink = new HBaseSink(configuration); + fileChunkStreamSink = new StreamSink<>(hBaseSink); + testHarness = new OneInputStreamOperatorTestHarness<>(fileChunkStreamSink); + testHarness.setup(); + testHarness.open(); + data = RandomUtil.randomString(1000).getBytes(); + fileChunk = new FileChunk(PublicUtil.getUUID(), "eml", 0, data.length, data, "seek", 1, 5, System.currentTimeMillis()); testHarness.processElement(new StreamRecord<>(fileChunk)); - Thread.sleep(configuration.getInteger(Configs.SINK_BATCH_TIME) * 1000L + 1000); + fileChunk = new FileChunk(PublicUtil.getUUID(), "eml", 0, data.length, data, "seek", 1, 5, System.currentTimeMillis()); + testHarness.processElement(new StreamRecord<>(fileChunk)); + Thread.sleep(configuration.getInteger(Configs.SINK_HBASE_BATCH_INTERVAL_MS) + 1000); Assert.assertEquals(2, hBaseSink.chunksInCounter.getCount()); Assert.assertEquals(2, hBaseSink.chunksOutCounter.getCount()); Assert.assertEquals(0, hBaseSink.errorChunksCounter.getCount()); @@ -633,21 +618,20 @@ public class FileChunkCombinerTests { private StreamExecutionEnvironment createPipeline(int parallelism, SourceFunction source, long windowTime, long windowIdleTime) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(parallelism); - WatermarkStrategy watermarkStrategy = WatermarkStrategy - .forBoundedOutOfOrderness(Duration.ofSeconds(0)) - .withTimestampAssigner((FileChunk, timestamp) -> FileChunk.getTimestamp() / 1000); + List> triggers = new ArrayList<>(); - triggers.add(EventTimeTrigger.create()); - triggers.add(LastChunkOrNoDataInTimeTrigger.of(windowIdleTime * 1000)); + triggers.add(ProcessingTimeTrigger.create()); + if (configuration.get(Configs.COMBINER_WINDOW_ENABLE_LAST_CHUNK_TRIGGER)) { + triggers.add(LastChunkTrigger.create()); + } Trigger trigger = MultipleTrigger.of(triggers); env.addSource(source) .map(new ParseMessagePackMapFunction()) .filter(new FileChunkFilterFunction("", "test")) - .assignTimestampsAndWatermarks(watermarkStrategy) .keyBy(new FileChunkKeySelector(), BasicTypeInfo.STRING_TYPE_INFO) - .window(TumblingEventTimeWindows.of(Time.seconds(windowTime))) + .window(TumblingProcessingTimeWindows.of(Time.seconds(windowTime))) .trigger(trigger) - .process(new CombineChunkProcessWindowFunction(Integer.MAX_VALUE)) + .process(new CombineChunkProcessWindowFunction()) .addSink(new CollectSink()); return env; }