限流时支持根据表达式忽略重要文件

This commit is contained in:
houjinchuan
2024-03-14 16:31:54 +08:00
parent 38e1049fa0
commit 0243e2b686
5 changed files with 44 additions and 18 deletions

View File

@@ -40,7 +40,7 @@ public class FileChunkCombiner {
SingleOutputStreamOperator<FileChunk> parseMessagePackStream = environment SingleOutputStreamOperator<FileChunk> parseMessagePackStream = environment
.addSource(KafkaConsumer.byteArrayConsumer(configuration)) .addSource(KafkaConsumer.byteArrayConsumer(configuration))
.name("Kafka Source") .name("Kafka Source")
.map(new ParseMessagePackMapFunction(configuration.get(Configs.MAP_ENABLE_RATE_LIMIT), configuration.get(Configs.MAP_RATE_LIMIT_THRESHOLD))) .map(new ParseMessagePackMapFunction(configuration.get(Configs.ENABLE_RATE_LIMIT), configuration.get(Configs.RATE_LIMIT_THRESHOLD), configuration.get(Configs.RATE_LIMIT_EXCLUSION_EXPRESSION)))
.name("Map: Parse Message Pack") .name("Map: Parse Message Pack")
.filter(new FileChunkFilterFunction(configuration.getLong(Configs.FILE_MAX_SIZE), configuration.getString(Configs.FILTER_EXPRESSION))) .filter(new FileChunkFilterFunction(configuration.getLong(Configs.FILE_MAX_SIZE), configuration.getString(Configs.FILTER_EXPRESSION)))
.assignTimestampsAndWatermarks(watermarkStrategy); .assignTimestampsAndWatermarks(watermarkStrategy);

View File

@@ -43,12 +43,15 @@ public class Configs {
.stringType() .stringType()
.noDefaultValue(); .noDefaultValue();
public static final ConfigOption<Boolean> MAP_ENABLE_RATE_LIMIT = ConfigOptions.key("map.enable.rate.limit") public static final ConfigOption<Boolean> ENABLE_RATE_LIMIT = ConfigOptions.key("enable.rate.limit")
.booleanType() .booleanType()
.defaultValue(false); .defaultValue(false);
public static final ConfigOption<Long> MAP_RATE_LIMIT_THRESHOLD = ConfigOptions.key("map.rate.limit.threshold") public static final ConfigOption<Long> RATE_LIMIT_THRESHOLD = ConfigOptions.key("rate.limit.threshold")
.longType() .longType()
.defaultValue(Long.MAX_VALUE); .defaultValue(Long.MAX_VALUE);
public static final ConfigOption<String> RATE_LIMIT_EXCLUSION_EXPRESSION = ConfigOptions.key("rate.limit.exclusion.expression")
.stringType()
.defaultValue("");
public static final ConfigOption<Integer> COMBINER_WINDOW_PARALLELISM = ConfigOptions.key("combiner.window.parallelism") public static final ConfigOption<Integer> COMBINER_WINDOW_PARALLELISM = ConfigOptions.key("combiner.window.parallelism")
.intType() .intType()

View File

@@ -1,10 +1,12 @@
package com.zdjizhi.function; package com.zdjizhi.function;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONObject; import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil; import cn.hutool.json.JSONUtil;
import cn.hutool.log.Log; import cn.hutool.log.Log;
import cn.hutool.log.LogFactory; import cn.hutool.log.LogFactory;
import com.zdjizhi.pojo.FileChunk; import com.zdjizhi.pojo.FileChunk;
import org.apache.commons.jexl3.*;
import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.*; import org.apache.flink.metrics.*;
@@ -18,18 +20,21 @@ import static com.zdjizhi.utils.PublicConstants.COMBINE_MODE_APPEND;
public class ParseMessagePackMapFunction extends RichMapFunction<byte[], FileChunk> { public class ParseMessagePackMapFunction extends RichMapFunction<byte[], FileChunk> {
private static final Log LOG = LogFactory.get(); private static final Log LOG = LogFactory.get();
private final boolean enableRateLimit;
private final long rateLimitThreshold;
private final String rateLimitExpression;
public transient Counter parseMessagePackCounter; public transient Counter parseMessagePackCounter;
public transient Counter parseMessagePackErrorCounter; public transient Counter parseMessagePackErrorCounter;
public transient Counter rateLimitDropCounter; public transient Counter rateLimitDropCounter;
private final boolean enableRateLimit;
private final long rateLimitThreshold;
private long timestamp; private long timestamp;
private long count; private long count;
private JexlExpression jexlExpression;
private JexlContext jexlContext;
public ParseMessagePackMapFunction(boolean enableRateLimit, long rateLimitThreshold) { public ParseMessagePackMapFunction(boolean enableRateLimit, long rateLimitThreshold, String rateLimitExpression) {
this.rateLimitThreshold = rateLimitThreshold; this.rateLimitThreshold = rateLimitThreshold;
this.enableRateLimit = enableRateLimit; this.enableRateLimit = enableRateLimit;
this.rateLimitExpression = rateLimitExpression;
} }
@Override @Override
@@ -44,24 +49,37 @@ public class ParseMessagePackMapFunction extends RichMapFunction<byte[], FileChu
metricGroup.meter("rateLimitDropPerSecond", new MeterView(rateLimitDropCounter)); metricGroup.meter("rateLimitDropPerSecond", new MeterView(rateLimitDropCounter));
timestamp = System.currentTimeMillis(); timestamp = System.currentTimeMillis();
count = 0; count = 0;
JexlEngine jexlEngine = new JexlBuilder().create();
jexlExpression = jexlEngine.createExpression(rateLimitExpression);
jexlContext = new MapContext();
} }
@Override @Override
public FileChunk map(byte[] messagePackData) { public FileChunk map(byte[] messagePackData) {
FileChunk fileChunk = null; FileChunk fileChunk = parseMessagePack(messagePackData);
if (enableRateLimit) { if (enableRateLimit) {
count++; count++;
if (System.currentTimeMillis() - timestamp < 1000 && count <= rateLimitThreshold) { if (System.currentTimeMillis() - timestamp < 1000 && count > rateLimitThreshold) {
fileChunk = parseMessagePack(messagePackData); if (StrUtil.isNotEmpty(rateLimitExpression)) {
} else if (System.currentTimeMillis() - timestamp < 1000 && count > rateLimitThreshold) { jexlContext.set(fileChunk.getClass().getSimpleName(), fileChunk);
if (Boolean.parseBoolean(jexlExpression.evaluate(jexlContext).toString())) {
return fileChunk;
}
}
rateLimitDropCounter.inc(); rateLimitDropCounter.inc();
} else { fileChunk = null;
} else if (System.currentTimeMillis() - timestamp >= 1000) {
if (StrUtil.isNotEmpty(rateLimitExpression)) {
jexlContext.set(fileChunk.getClass().getSimpleName(), fileChunk);
if (Boolean.parseBoolean(jexlExpression.evaluate(jexlContext).toString())) {
return fileChunk;
}
}
rateLimitDropCounter.inc(); rateLimitDropCounter.inc();
fileChunk = null;
timestamp = System.currentTimeMillis(); timestamp = System.currentTimeMillis();
count = 0; count = 0;
} }
} else {
fileChunk = parseMessagePack(messagePackData);
} }
return fileChunk; return fileChunk;
} }

View File

@@ -18,8 +18,13 @@ source.kafka.user=admin
source.kafka.pin=galaxy2019 source.kafka.pin=galaxy2019
#SSL<53><4C>Ҫ #SSL<53><4C>Ҫ
source.kafka.tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\ source.kafka.tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\
map.enable.rate.limit=false enable.rate.limit=false
map.rate.limit.threshold=10000 rate.limit.threshold=10000
rate.limit.exclusion.expression=FileChunk.fileType == "eml"
file.max.chunk.count=100000
file.max.size=1073741824
#<23><><EFBFBD><EFBFBD><EFBFBD>ֶι<D6B6><CEB9>ˣ<EFBFBD>java<76><61><EFBFBD><EFBFBD>ʽ
#filter.expression=FileChunk.fileType == "txt" || FileChunk.fileType == "eml"
#<23><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD> #<23><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
combiner.window.parallelism=2 combiner.window.parallelism=2
combiner.window.time=10 combiner.window.time=10

View File

@@ -122,7 +122,7 @@ public class FileChunkCombinerTests {
@Test @Test
public void testParseMessagePackMapFunction() throws Exception { public void testParseMessagePackMapFunction() throws Exception {
ParseMessagePackMapFunction mapFunction = new ParseMessagePackMapFunction(false, Long.MAX_VALUE); ParseMessagePackMapFunction mapFunction = new ParseMessagePackMapFunction(false, Long.MAX_VALUE,"");
OneInputStreamOperatorTestHarness<byte[], FileChunk> testHarness = new OneInputStreamOperatorTestHarness<>(new StreamMap<>(mapFunction)); OneInputStreamOperatorTestHarness<byte[], FileChunk> testHarness = new OneInputStreamOperatorTestHarness<>(new StreamMap<>(mapFunction));
testHarness.setup(); testHarness.setup();
testHarness.open(); testHarness.open();
@@ -617,7 +617,7 @@ public class FileChunkCombinerTests {
triggers.add(LastChunkOrNoDataInTimeTrigger.of(windowIdleTime * 1000)); triggers.add(LastChunkOrNoDataInTimeTrigger.of(windowIdleTime * 1000));
Trigger<Object, TimeWindow> trigger = MultipleTrigger.of(triggers); Trigger<Object, TimeWindow> trigger = MultipleTrigger.of(triggers);
env.addSource(source) env.addSource(source)
.map(new ParseMessagePackMapFunction(false, Long.MAX_VALUE)) .map(new ParseMessagePackMapFunction(false, Long.MAX_VALUE,""))
.filter(new FileChunkFilterFunction(Long.MAX_VALUE, "")) .filter(new FileChunkFilterFunction(Long.MAX_VALUE, ""))
.assignTimestampsAndWatermarks(watermarkStrategy) .assignTimestampsAndWatermarks(watermarkStrategy)
.keyBy(new FileChunkKeySelector(), BasicTypeInfo.STRING_TYPE_INFO) .keyBy(new FileChunkKeySelector(), BasicTypeInfo.STRING_TYPE_INFO)