优化监控指标名,增加文件大小分布监控
This commit is contained in:
2
pom.xml
2
pom.xml
@@ -6,7 +6,7 @@
|
|||||||
|
|
||||||
<groupId>com.zdjizhi</groupId>
|
<groupId>com.zdjizhi</groupId>
|
||||||
<artifactId>file-chunk-combiner</artifactId>
|
<artifactId>file-chunk-combiner</artifactId>
|
||||||
<version>1.2.0</version>
|
<version>1.3.0</version>
|
||||||
|
|
||||||
<repositories>
|
<repositories>
|
||||||
<repository>
|
<repository>
|
||||||
|
|||||||
@@ -45,7 +45,7 @@ public class FileChunkCombiner {
|
|||||||
.filter(new FileChunkFilterFunction(configuration.getLong(Configs.FILE_MAX_SIZE), configuration.getString(Configs.FILTER_EXPRESSION)))
|
.filter(new FileChunkFilterFunction(configuration.getLong(Configs.FILE_MAX_SIZE), configuration.getString(Configs.FILTER_EXPRESSION)))
|
||||||
.assignTimestampsAndWatermarks(watermarkStrategy);
|
.assignTimestampsAndWatermarks(watermarkStrategy);
|
||||||
|
|
||||||
OutputTag<FileChunk> delayedChunkOutputTag = new OutputTag<>("delayed-chunk") {
|
OutputTag<FileChunk> delayedChunkOutputTag = new OutputTag<FileChunk>("delayed-chunk") {
|
||||||
};
|
};
|
||||||
|
|
||||||
List<Trigger<Object, TimeWindow>> triggers = new ArrayList<>();
|
List<Trigger<Object, TimeWindow>> triggers = new ArrayList<>();
|
||||||
@@ -69,7 +69,7 @@ public class FileChunkCombiner {
|
|||||||
windowStream.getSideOutput(delayedChunkOutputTag)
|
windowStream.getSideOutput(delayedChunkOutputTag)
|
||||||
.map(new SideOutputMapFunction())
|
.map(new SideOutputMapFunction())
|
||||||
.addSink(new HosSink(configuration))
|
.addSink(new HosSink(configuration))
|
||||||
.name("Hos Delayed Chunk");
|
.name("Delayed Chunk");
|
||||||
} else {
|
} else {
|
||||||
windowStream.addSink(new HBaseSink(configuration))
|
windowStream.addSink(new HBaseSink(configuration))
|
||||||
.name("HBase")
|
.name("HBase")
|
||||||
@@ -77,7 +77,7 @@ public class FileChunkCombiner {
|
|||||||
windowStream.getSideOutput(delayedChunkOutputTag)
|
windowStream.getSideOutput(delayedChunkOutputTag)
|
||||||
.map(new SideOutputMapFunction())
|
.map(new SideOutputMapFunction())
|
||||||
.addSink(new HBaseSink(configuration))
|
.addSink(new HBaseSink(configuration))
|
||||||
.name("HBase Delayed Chunk");
|
.name("Delayed Chunk");
|
||||||
}
|
}
|
||||||
|
|
||||||
environment.execute(configuration.get(Configs.FLINK_JOB_NAME));
|
environment.execute(configuration.get(Configs.FLINK_JOB_NAME));
|
||||||
|
|||||||
@@ -22,8 +22,8 @@ import static com.zdjizhi.utils.PublicConstants.COMBINE_MODE_SEEK;
|
|||||||
public class CombineChunkProcessWindowFunction extends ProcessWindowFunction<FileChunk, FileChunk, String, TimeWindow> {
|
public class CombineChunkProcessWindowFunction extends ProcessWindowFunction<FileChunk, FileChunk, String, TimeWindow> {
|
||||||
private static final Log LOG = LogFactory.get();
|
private static final Log LOG = LogFactory.get();
|
||||||
|
|
||||||
public transient Counter duplicateChunkCounter;
|
public transient Counter duplicateChunksCounter;
|
||||||
public transient Counter combineErrorCounter;
|
public transient Counter combineErrorChunksCounter;
|
||||||
private final int fileMaxChunkCount;
|
private final int fileMaxChunkCount;
|
||||||
|
|
||||||
public CombineChunkProcessWindowFunction(int fileMaxChunkCount) {
|
public CombineChunkProcessWindowFunction(int fileMaxChunkCount) {
|
||||||
@@ -34,10 +34,10 @@ public class CombineChunkProcessWindowFunction extends ProcessWindowFunction<Fil
|
|||||||
public void open(Configuration parameters) throws Exception {
|
public void open(Configuration parameters) throws Exception {
|
||||||
super.open(parameters);
|
super.open(parameters);
|
||||||
MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
|
MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
|
||||||
duplicateChunkCounter = metricGroup.counter("duplicateChunkCount");
|
duplicateChunksCounter = metricGroup.counter("duplicateChunksCount");
|
||||||
combineErrorCounter = metricGroup.counter("combineErrorCount");
|
combineErrorChunksCounter = metricGroup.counter("combineErrorChunksCount");
|
||||||
metricGroup.meter("duplicateChunkPerSecond", new MeterView(duplicateChunkCounter));
|
metricGroup.meter("numDuplicateChunksInPerSecond", new MeterView(duplicateChunksCounter));
|
||||||
metricGroup.meter("combineErrorPerSecond", new MeterView(combineErrorCounter));
|
metricGroup.meter("numChunksCombineErrorPerSecond", new MeterView(combineErrorChunksCounter));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -50,8 +50,8 @@ public class CombineChunkProcessWindowFunction extends ProcessWindowFunction<Fil
|
|||||||
|
|
||||||
private List<FileChunk> combine(Iterable<FileChunk> input) {
|
private List<FileChunk> combine(Iterable<FileChunk> input) {
|
||||||
List<FileChunk> combinedFileChunkList = new ArrayList<>();
|
List<FileChunk> combinedFileChunkList = new ArrayList<>();
|
||||||
try {
|
|
||||||
List<FileChunk> originalFileChunkList = StreamSupport.stream(input.spliterator(), false).collect(Collectors.toList());
|
List<FileChunk> originalFileChunkList = StreamSupport.stream(input.spliterator(), false).collect(Collectors.toList());
|
||||||
|
try {
|
||||||
List<byte[]> waitingToCombineChunkList = new ArrayList<>();
|
List<byte[]> waitingToCombineChunkList = new ArrayList<>();
|
||||||
if (COMBINE_MODE_SEEK.equals(originalFileChunkList.get(0).getCombineMode())) {
|
if (COMBINE_MODE_SEEK.equals(originalFileChunkList.get(0).getCombineMode())) {
|
||||||
// 按照offset排序
|
// 按照offset排序
|
||||||
@@ -71,7 +71,7 @@ public class CombineChunkProcessWindowFunction extends ProcessWindowFunction<Fil
|
|||||||
long actualOffset = currentFileChunk.getOffset();
|
long actualOffset = currentFileChunk.getOffset();
|
||||||
if (expectedOffset > actualOffset) {// 期望offset大于当前offset,该块为重复块,跳过该块
|
if (expectedOffset > actualOffset) {// 期望offset大于当前offset,该块为重复块,跳过该块
|
||||||
duplicateCount++;
|
duplicateCount++;
|
||||||
duplicateChunkCounter.inc();
|
duplicateChunksCounter.inc();
|
||||||
} else if (expectedOffset == actualOffset) {// 期望offset等于当前offset,将该块添加到待合并集合中
|
} else if (expectedOffset == actualOffset) {// 期望offset等于当前offset,将该块添加到待合并集合中
|
||||||
if (currentFileChunk.getLastChunkFlag() == 1) {
|
if (currentFileChunk.getLastChunkFlag() == 1) {
|
||||||
lastChunkFlag = currentFileChunk.getLastChunkFlag();
|
lastChunkFlag = currentFileChunk.getLastChunkFlag();
|
||||||
@@ -129,7 +129,7 @@ public class CombineChunkProcessWindowFunction extends ProcessWindowFunction<Fil
|
|||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Combiner chunk error.", e);
|
LOG.error("Combiner chunk error.", e);
|
||||||
combineErrorCounter.inc();
|
combineErrorChunksCounter.inc(originalFileChunkList.size());
|
||||||
}
|
}
|
||||||
return combinedFileChunkList;
|
return combinedFileChunkList;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -12,7 +12,7 @@ import org.apache.flink.metrics.MetricGroup;
|
|||||||
public class FileChunkFilterFunction extends RichFilterFunction<FileChunk> {
|
public class FileChunkFilterFunction extends RichFilterFunction<FileChunk> {
|
||||||
private final long maxFileSize;
|
private final long maxFileSize;
|
||||||
private final String filterExpression;
|
private final String filterExpression;
|
||||||
public transient Counter filterChunkCounter;
|
public transient Counter filterChunksCounter;
|
||||||
private JexlExpression jexlExpression;
|
private JexlExpression jexlExpression;
|
||||||
private JexlContext jexlContext;
|
private JexlContext jexlContext;
|
||||||
|
|
||||||
@@ -25,8 +25,8 @@ public class FileChunkFilterFunction extends RichFilterFunction<FileChunk> {
|
|||||||
public void open(Configuration parameters) throws Exception {
|
public void open(Configuration parameters) throws Exception {
|
||||||
super.open(parameters);
|
super.open(parameters);
|
||||||
MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
|
MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
|
||||||
filterChunkCounter = metricGroup.counter("filterChunkCount");
|
filterChunksCounter = metricGroup.counter("filterChunksCount");
|
||||||
metricGroup.meter("filterChunkPerSecond", new MeterView(filterChunkCounter));
|
metricGroup.meter("numChunksFilterPerSecond", new MeterView(filterChunksCounter));
|
||||||
JexlEngine jexlEngine = new JexlBuilder().create();
|
JexlEngine jexlEngine = new JexlBuilder().create();
|
||||||
jexlExpression = jexlEngine.createExpression(filterExpression);
|
jexlExpression = jexlEngine.createExpression(filterExpression);
|
||||||
jexlContext = new MapContext();
|
jexlContext = new MapContext();
|
||||||
@@ -35,13 +35,13 @@ public class FileChunkFilterFunction extends RichFilterFunction<FileChunk> {
|
|||||||
@Override
|
@Override
|
||||||
public boolean filter(FileChunk value) {
|
public boolean filter(FileChunk value) {
|
||||||
if (value == null || value.getOffset() > maxFileSize) {
|
if (value == null || value.getOffset() > maxFileSize) {
|
||||||
filterChunkCounter.inc();
|
filterChunksCounter.inc();
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (StrUtil.isNotEmpty(filterExpression)) {
|
if (StrUtil.isNotEmpty(filterExpression)) {
|
||||||
jexlContext.set(value.getClass().getSimpleName(), value);
|
jexlContext.set(value.getClass().getSimpleName(), value);
|
||||||
if (!Boolean.parseBoolean(jexlExpression.evaluate(jexlContext).toString())) {
|
if (!Boolean.parseBoolean(jexlExpression.evaluate(jexlContext).toString())) {
|
||||||
filterChunkCounter.inc();
|
filterChunksCounter.inc();
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -23,9 +23,21 @@ public class ParseMessagePackMapFunction extends RichMapFunction<byte[], FileChu
|
|||||||
private final boolean enableRateLimit;
|
private final boolean enableRateLimit;
|
||||||
private final long rateLimitThreshold;
|
private final long rateLimitThreshold;
|
||||||
private final String rateLimitExpression;
|
private final String rateLimitExpression;
|
||||||
public transient Counter parseMessagePackCounter;
|
public transient Counter parseMessagePacksCounter;
|
||||||
public transient Counter parseMessagePackErrorCounter;
|
public transient Counter parseErrorMessagePacksCounter;
|
||||||
public transient Counter rateLimitDropCounter;
|
public transient Counter rateLimitDropChunksCounter;
|
||||||
|
public transient Counter equal0BChunksCounter;
|
||||||
|
public transient Counter lessThan1KBChunksCounter;
|
||||||
|
public transient Counter between1KBAnd5KBChunksCounter;
|
||||||
|
public transient Counter between5KBAnd10KBChunksCounter;
|
||||||
|
public transient Counter between10KBAnd50KBChunksCounter;
|
||||||
|
public transient Counter between50KBAnd100KBChunksCounter;
|
||||||
|
public transient Counter greaterThan100KBChunksCounter;
|
||||||
|
public transient Counter emlChunksCounter;
|
||||||
|
public transient Counter txtChunksCounter;
|
||||||
|
public transient Counter htmlChunksCounter;
|
||||||
|
public transient Counter pcapngChunksCounter;
|
||||||
|
public transient Counter mediaChunksCounter;
|
||||||
private long timestamp;
|
private long timestamp;
|
||||||
private long count;
|
private long count;
|
||||||
private JexlExpression jexlExpression;
|
private JexlExpression jexlExpression;
|
||||||
@@ -41,12 +53,36 @@ public class ParseMessagePackMapFunction extends RichMapFunction<byte[], FileChu
|
|||||||
public void open(Configuration parameters) throws Exception {
|
public void open(Configuration parameters) throws Exception {
|
||||||
super.open(parameters);
|
super.open(parameters);
|
||||||
MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
|
MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
|
||||||
parseMessagePackCounter = metricGroup.counter("parseMessagePackCount");
|
equal0BChunksCounter = metricGroup.counter("equal0BChunksCount");
|
||||||
parseMessagePackErrorCounter = metricGroup.counter("parseMessagePackErrorCount");
|
lessThan1KBChunksCounter = metricGroup.counter("lessThan1KBChunksCount");
|
||||||
rateLimitDropCounter = metricGroup.counter("rateLimitDropCount");
|
between1KBAnd5KBChunksCounter = metricGroup.counter("between1KBAnd5KBChunksCount");
|
||||||
metricGroup.meter("parseMessagePackPerSecond", new MeterView(parseMessagePackCounter));
|
between5KBAnd10KBChunksCounter = metricGroup.counter("between5KBAnd10KBChunksCount");
|
||||||
metricGroup.meter("parseMessagePackErrorPerSecond", new MeterView(parseMessagePackErrorCounter));
|
between10KBAnd50KBChunksCounter = metricGroup.counter("between10KBAnd50KBChunksCount");
|
||||||
metricGroup.meter("rateLimitDropPerSecond", new MeterView(rateLimitDropCounter));
|
between50KBAnd100KBChunksCounter = metricGroup.counter("between50KBAnd100KBChunksCount");
|
||||||
|
greaterThan100KBChunksCounter = metricGroup.counter("greaterThan100KBChunksCount");
|
||||||
|
metricGroup.meter("numEqual0BChunksInPerSecond", new MeterView(equal0BChunksCounter));
|
||||||
|
metricGroup.meter("numLessThan1KBChunksInPerSecond", new MeterView(lessThan1KBChunksCounter));
|
||||||
|
metricGroup.meter("numBetween1KBAnd5KBChunksInPerSecond", new MeterView(between1KBAnd5KBChunksCounter));
|
||||||
|
metricGroup.meter("numBetween5KBAnd10KBChunksInPerSecond", new MeterView(between5KBAnd10KBChunksCounter));
|
||||||
|
metricGroup.meter("numBetween10KBAnd50KBChunksInPerSecond", new MeterView(between10KBAnd50KBChunksCounter));
|
||||||
|
metricGroup.meter("numBetween50KBAnd100KBChunksInPerSecond", new MeterView(between50KBAnd100KBChunksCounter));
|
||||||
|
metricGroup.meter("numGreaterThan100KBChunksInPerSecond", new MeterView(greaterThan100KBChunksCounter));
|
||||||
|
emlChunksCounter = metricGroup.counter("emlChunksCount");
|
||||||
|
txtChunksCounter = metricGroup.counter("txtChunksCount");
|
||||||
|
htmlChunksCounter = metricGroup.counter("htmlChunksCount");
|
||||||
|
pcapngChunksCounter = metricGroup.counter("pcapngChunksCount");
|
||||||
|
mediaChunksCounter = metricGroup.counter("mediaChunksCount");
|
||||||
|
metricGroup.meter("numEmlChunksInPerSecond", new MeterView(emlChunksCounter));
|
||||||
|
metricGroup.meter("numTxtChunksInPerSecond", new MeterView(txtChunksCounter));
|
||||||
|
metricGroup.meter("numHtmlChunksInPerSecond", new MeterView(htmlChunksCounter));
|
||||||
|
metricGroup.meter("numPcapngChunksInPerSecond", new MeterView(pcapngChunksCounter));
|
||||||
|
metricGroup.meter("numMediaChunksInPerSecond", new MeterView(mediaChunksCounter));
|
||||||
|
parseMessagePacksCounter = metricGroup.counter("parseMessagePacksCount");
|
||||||
|
parseErrorMessagePacksCounter = metricGroup.counter("parseErrorMessagePacksCount");
|
||||||
|
rateLimitDropChunksCounter = metricGroup.counter("rateLimitDropChunksCount");
|
||||||
|
metricGroup.meter("numMessagePacksParsePerSecond", new MeterView(parseMessagePacksCounter));
|
||||||
|
metricGroup.meter("numMessagePacksParseErrorPerSecond", new MeterView(parseErrorMessagePacksCounter));
|
||||||
|
metricGroup.meter("numChunksRateLimitDropPerSecond", new MeterView(rateLimitDropChunksCounter));
|
||||||
timestamp = System.currentTimeMillis();
|
timestamp = System.currentTimeMillis();
|
||||||
count = 0;
|
count = 0;
|
||||||
JexlEngine jexlEngine = new JexlBuilder().create();
|
JexlEngine jexlEngine = new JexlBuilder().create();
|
||||||
@@ -66,7 +102,7 @@ public class ParseMessagePackMapFunction extends RichMapFunction<byte[], FileChu
|
|||||||
return fileChunk;
|
return fileChunk;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
rateLimitDropCounter.inc();
|
rateLimitDropChunksCounter.inc();
|
||||||
fileChunk = null;
|
fileChunk = null;
|
||||||
} else if (System.currentTimeMillis() - timestamp >= 1000) {
|
} else if (System.currentTimeMillis() - timestamp >= 1000) {
|
||||||
if (StrUtil.isNotEmpty(rateLimitExpression)) {
|
if (StrUtil.isNotEmpty(rateLimitExpression)) {
|
||||||
@@ -75,7 +111,7 @@ public class ParseMessagePackMapFunction extends RichMapFunction<byte[], FileChu
|
|||||||
return fileChunk;
|
return fileChunk;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
rateLimitDropCounter.inc();
|
rateLimitDropChunksCounter.inc();
|
||||||
fileChunk = null;
|
fileChunk = null;
|
||||||
timestamp = System.currentTimeMillis();
|
timestamp = System.currentTimeMillis();
|
||||||
count = 0;
|
count = 0;
|
||||||
@@ -85,7 +121,7 @@ public class ParseMessagePackMapFunction extends RichMapFunction<byte[], FileChu
|
|||||||
}
|
}
|
||||||
|
|
||||||
private FileChunk parseMessagePack(byte[] messagePackData) {
|
private FileChunk parseMessagePack(byte[] messagePackData) {
|
||||||
parseMessagePackCounter.inc();
|
parseMessagePacksCounter.inc();
|
||||||
FileChunk fileChunk;
|
FileChunk fileChunk;
|
||||||
try {
|
try {
|
||||||
fileChunk = new FileChunk();
|
fileChunk = new FileChunk();
|
||||||
@@ -138,11 +174,50 @@ public class ParseMessagePackMapFunction extends RichMapFunction<byte[], FileChu
|
|||||||
if (COMBINE_MODE_APPEND.equals(fileChunk.getCombineMode())) {
|
if (COMBINE_MODE_APPEND.equals(fileChunk.getCombineMode())) {
|
||||||
fileChunk.setLastChunkFlag(0);
|
fileChunk.setLastChunkFlag(0);
|
||||||
}
|
}
|
||||||
|
calculateChunkSize(fileChunk.getLength());
|
||||||
|
calculateFileType(fileChunk.getFileType());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
parseMessagePackErrorCounter.inc();
|
parseErrorMessagePacksCounter.inc();
|
||||||
LOG.error("Parse messagePack failed.", e);
|
LOG.error("Parse messagePack failed.", e);
|
||||||
fileChunk = null;
|
fileChunk = null;
|
||||||
}
|
}
|
||||||
return fileChunk;
|
return fileChunk;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void calculateChunkSize(long length) {
|
||||||
|
if (length == 0) {
|
||||||
|
equal0BChunksCounter.inc();
|
||||||
|
} else if (length <= 1024) {
|
||||||
|
lessThan1KBChunksCounter.inc();
|
||||||
|
} else if (length <= 5 * 1024) {
|
||||||
|
between1KBAnd5KBChunksCounter.inc();
|
||||||
|
} else if (length <= 10 * 1024) {
|
||||||
|
between5KBAnd10KBChunksCounter.inc();
|
||||||
|
} else if (length <= 50 * 1024) {
|
||||||
|
between10KBAnd50KBChunksCounter.inc();
|
||||||
|
} else if (length <= 100 * 1024) {
|
||||||
|
between50KBAnd100KBChunksCounter.inc();
|
||||||
|
} else {
|
||||||
|
greaterThan100KBChunksCounter.inc();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void calculateFileType(String fileType) {
|
||||||
|
switch (fileType) {
|
||||||
|
case "eml":
|
||||||
|
emlChunksCounter.inc();
|
||||||
|
break;
|
||||||
|
case "html":
|
||||||
|
htmlChunksCounter.inc();
|
||||||
|
break;
|
||||||
|
case "txt":
|
||||||
|
txtChunksCounter.inc();
|
||||||
|
break;
|
||||||
|
case "pcapng":
|
||||||
|
pcapngChunksCounter.inc();
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
mediaChunksCounter.inc();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -11,19 +11,19 @@ import static com.zdjizhi.utils.PublicConstants.COMBINE_MODE_APPEND;
|
|||||||
|
|
||||||
public class SideOutputMapFunction extends RichMapFunction<FileChunk, FileChunk> {
|
public class SideOutputMapFunction extends RichMapFunction<FileChunk, FileChunk> {
|
||||||
|
|
||||||
private transient Counter delayedChunkCounter;
|
public transient Counter delayedChunksCounter;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void open(Configuration parameters) throws Exception {
|
public void open(Configuration parameters) throws Exception {
|
||||||
super.open(parameters);
|
super.open(parameters);
|
||||||
MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
|
MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
|
||||||
delayedChunkCounter = metricGroup.counter("delayedChunkCount");
|
delayedChunksCounter = metricGroup.counter("delayedChunksCount");
|
||||||
metricGroup.meter("delayedChunkPerSecond", new MeterView(delayedChunkCounter));
|
metricGroup.meter("numChunksDelayPerSecond", new MeterView(delayedChunksCounter));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public FileChunk map(FileChunk fileChunk) {
|
public FileChunk map(FileChunk fileChunk) {
|
||||||
delayedChunkCounter.inc();
|
delayedChunksCounter.inc();
|
||||||
fileChunk.setChunkCount(1);
|
fileChunk.setChunkCount(1);
|
||||||
if (COMBINE_MODE_APPEND.equals(fileChunk.getCombineMode())) {
|
if (COMBINE_MODE_APPEND.equals(fileChunk.getCombineMode())) {
|
||||||
fileChunk.setChunkNumbers(fileChunk.getTimestamp() + "-" + fileChunk.getChunk().length + ";");
|
fileChunk.setChunkNumbers(fileChunk.getTimestamp() + "-" + fileChunk.getChunk().length + ";");
|
||||||
|
|||||||
@@ -29,10 +29,16 @@ public class HBaseSink extends RichSinkFunction<FileChunk> {
|
|||||||
private static final Log LOG = LogFactory.get();
|
private static final Log LOG = LogFactory.get();
|
||||||
|
|
||||||
private final Configuration configuration;
|
private final Configuration configuration;
|
||||||
public transient Counter sendHBaseCounter;
|
public transient Counter sinkRequestsCounter;
|
||||||
public transient Counter sendHBaseErrorCounter;
|
public transient Counter sinkErrorRequestsCounter;
|
||||||
public transient Counter sendHBaseFileCounter;
|
public transient Counter sinkFilesCounter;
|
||||||
public transient Counter sendHBaseChunkCounter;
|
public transient Counter sinkChunksCounter;
|
||||||
|
public transient Counter lessThan5KBChunksCounter;
|
||||||
|
public transient Counter between5KBAnd10KBChunksCounter;
|
||||||
|
public transient Counter between10KBAnd50KBChunksCounter;
|
||||||
|
public transient Counter between50KBAnd100KBChunksCounter;
|
||||||
|
public transient Counter between100KBAnd1MBChunksCounter;
|
||||||
|
public transient Counter greaterThan1MBChunksCounter;
|
||||||
private boolean isAsync;
|
private boolean isAsync;
|
||||||
private Connection syncHBaseConnection;
|
private Connection syncHBaseConnection;
|
||||||
private AsyncConnection AsyncHBaseConnection;
|
private AsyncConnection AsyncHBaseConnection;
|
||||||
@@ -58,14 +64,27 @@ public class HBaseSink extends RichSinkFunction<FileChunk> {
|
|||||||
public void open(Configuration parameters) throws Exception {
|
public void open(Configuration parameters) throws Exception {
|
||||||
super.open(parameters);
|
super.open(parameters);
|
||||||
MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
|
MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
|
||||||
sendHBaseCounter = metricGroup.counter("sendHBaseCount");
|
lessThan5KBChunksCounter = metricGroup.counter("lessThan5KBChunksCount");
|
||||||
sendHBaseErrorCounter = metricGroup.counter("sendHBaseErrorCount");
|
between5KBAnd10KBChunksCounter = metricGroup.counter("between5KBAnd10KBChunksCount");
|
||||||
sendHBaseFileCounter = metricGroup.counter("sendHBaseFileCount");
|
between10KBAnd50KBChunksCounter = metricGroup.counter("between10KBAnd50KBChunksCount");
|
||||||
sendHBaseChunkCounter = metricGroup.counter("sendHBaseChunkCount");
|
between50KBAnd100KBChunksCounter = metricGroup.counter("between50KBAnd100KBChunksCount");
|
||||||
metricGroup.meter("sendHBasePerSecond", new MeterView(sendHBaseCounter, 5));
|
between100KBAnd1MBChunksCounter = metricGroup.counter("between100KBAnd1MBChunksCount");
|
||||||
metricGroup.meter("sendHBaseErrorPerSecond", new MeterView(sendHBaseErrorCounter));
|
greaterThan1MBChunksCounter = metricGroup.counter("greaterThan1MBChunksCount");
|
||||||
metricGroup.meter("sendHBaseFilePerSecond", new MeterView(sendHBaseFileCounter));
|
metricGroup.meter("numLessThan5KBChunksOutPerSecond", new MeterView(lessThan5KBChunksCounter));
|
||||||
metricGroup.meter("sendHBaseChunkPerSecond", new MeterView(sendHBaseChunkCounter));
|
metricGroup.meter("numBetween5KBAnd10KBChunksOutPerSecond", new MeterView(between5KBAnd10KBChunksCounter));
|
||||||
|
metricGroup.meter("numBetween10KBAnd50KBChunksOutPerSecond", new MeterView(between10KBAnd50KBChunksCounter));
|
||||||
|
metricGroup.meter("numBetween50KBAnd100KBChunkPsOuterSecond", new MeterView(between50KBAnd100KBChunksCounter));
|
||||||
|
metricGroup.meter("numBetween100KBAnd1MBChunksOutPerSecond", new MeterView(between100KBAnd1MBChunksCounter));
|
||||||
|
metricGroup.meter("numGreaterThan1MBChunksOutPerSecond", new MeterView(greaterThan1MBChunksCounter));
|
||||||
|
sinkRequestsCounter = metricGroup.counter("sinkRequestsCount");
|
||||||
|
sinkErrorRequestsCounter = metricGroup.counter("sinkErrorRequestsCount");
|
||||||
|
sinkFilesCounter = metricGroup.counter("sinkFilesCount");
|
||||||
|
sinkChunksCounter = metricGroup.counter("sinkChunksCount");
|
||||||
|
metricGroup.meter("numRequestsSinkPerSecond", new MeterView(sinkRequestsCounter, 5));
|
||||||
|
metricGroup.meter("numErrorRequestsSinkPerSecond", new MeterView(sinkErrorRequestsCounter));
|
||||||
|
metricGroup.meter("numFilesSinkPerSecond", new MeterView(sinkFilesCounter));
|
||||||
|
metricGroup.meter("numChunksSinkPerSecond", new MeterView(sinkChunksCounter));
|
||||||
|
|
||||||
isAsync = configuration.getBoolean(Configs.SINK_ASYNC);
|
isAsync = configuration.getBoolean(Configs.SINK_ASYNC);
|
||||||
if (isAsync) {
|
if (isAsync) {
|
||||||
AsyncHBaseConnection = HBaseConnectionUtil.getInstance(configuration).getAsyncHBaseConnection();
|
AsyncHBaseConnection = HBaseConnectionUtil.getInstance(configuration).getAsyncHBaseConnection();
|
||||||
@@ -90,16 +109,17 @@ public class HBaseSink extends RichSinkFunction<FileChunk> {
|
|||||||
@Override
|
@Override
|
||||||
public void invoke(FileChunk fileChunk, Context context) {
|
public void invoke(FileChunk fileChunk, Context context) {
|
||||||
if (COMBINE_MODE_SEEK.equals(fileChunk.getCombineMode()) && configuration.get(Configs.SINK_BATCH)) {
|
if (COMBINE_MODE_SEEK.equals(fileChunk.getCombineMode()) && configuration.get(Configs.SINK_BATCH)) {
|
||||||
sendHBaseChunkCounter.inc();
|
sinkChunksCounter.inc();
|
||||||
byte[] data = "".getBytes();
|
byte[] data = "".getBytes();
|
||||||
if (fileChunk.getChunk() != null) {
|
if (fileChunk.getChunk() != null) {
|
||||||
data = fileChunk.getChunk();
|
data = fileChunk.getChunk();
|
||||||
}
|
}
|
||||||
|
int chunkLength = data.length;
|
||||||
long timestamp = System.currentTimeMillis();
|
long timestamp = System.currentTimeMillis();
|
||||||
Map<String, String> partMessageMap = new HashMap<>();
|
Map<String, String> partMessageMap = new HashMap<>();
|
||||||
partMessageMap.put(APPEND_FILE_PART_MESSAGE_CHUNK_COUNT, fileChunk.getChunkCount() + "");
|
partMessageMap.put(APPEND_FILE_PART_MESSAGE_CHUNK_COUNT, fileChunk.getChunkCount() + "");
|
||||||
partMessageMap.put(APPEND_FILE_PART_MESSAGE_LAST_PART_FLAG, fileChunk.getLastChunkFlag() + "");
|
partMessageMap.put(APPEND_FILE_PART_MESSAGE_LAST_PART_FLAG, fileChunk.getLastChunkFlag() + "");
|
||||||
partMessageMap.put(APPEND_FILE_PART_MESSAGE_SIZE, data.length + "");
|
partMessageMap.put(APPEND_FILE_PART_MESSAGE_SIZE, chunkLength + "");
|
||||||
Put dataPut = new Put(Bytes.toBytes(PublicUtil.getRowKey(fileChunk.getUuid()) + PublicConstants.FILE_DATA_ROW_SUFFIX));
|
Put dataPut = new Put(Bytes.toBytes(PublicUtil.getRowKey(fileChunk.getUuid()) + PublicConstants.FILE_DATA_ROW_SUFFIX));
|
||||||
dataPut.addColumn(BYTE_FAMILY_DATA, Bytes.toBytes(String.valueOf(fileChunk.getOffset())), data);
|
dataPut.addColumn(BYTE_FAMILY_DATA, Bytes.toBytes(String.valueOf(fileChunk.getOffset())), data);
|
||||||
dataPut.addColumn(BYTE_FAMILY_META, Bytes.toBytes(String.valueOf(fileChunk.getOffset())), Bytes.toBytes(partMessageMap.toString()));
|
dataPut.addColumn(BYTE_FAMILY_META, Bytes.toBytes(String.valueOf(fileChunk.getOffset())), Bytes.toBytes(partMessageMap.toString()));
|
||||||
@@ -126,25 +146,26 @@ public class HBaseSink extends RichSinkFunction<FileChunk> {
|
|||||||
Put indexFilenamePut = new Put(Bytes.toBytes(indexFilenameKey));
|
Put indexFilenamePut = new Put(Bytes.toBytes(indexFilenameKey));
|
||||||
indexFilenamePut.addColumn(HBaseColumnConstants.BYTE_FAMILY_META, HBaseColumnConstants.BYTE_COLUMN_FILENAME, Bytes.toBytes(fileChunk.getUuid()));
|
indexFilenamePut.addColumn(HBaseColumnConstants.BYTE_FAMILY_META, HBaseColumnConstants.BYTE_COLUMN_FILENAME, Bytes.toBytes(fileChunk.getUuid()));
|
||||||
indexFilenamePutList.add(indexFilenamePut);
|
indexFilenamePutList.add(indexFilenamePut);
|
||||||
sendHBaseFileCounter.inc();
|
sinkFilesCounter.inc();
|
||||||
} else {
|
} else {
|
||||||
Put metaPut = new Put(Bytes.toBytes(PublicUtil.getRowKey(fileChunk.getUuid())));
|
Put metaPut = new Put(Bytes.toBytes(PublicUtil.getRowKey(fileChunk.getUuid())));
|
||||||
metaPut.addColumn(HBaseColumnConstants.BYTE_FAMILY_META, HBaseColumnConstants.BYTE_COLUMN_LAST_MODIFIED, Bytes.toBytes(timestamp));
|
metaPut.addColumn(HBaseColumnConstants.BYTE_FAMILY_META, HBaseColumnConstants.BYTE_COLUMN_LAST_MODIFIED, Bytes.toBytes(timestamp));
|
||||||
dataPutList.add(metaPut);
|
dataPutList.add(metaPut);
|
||||||
}
|
}
|
||||||
chunkCount++;
|
chunkCount++;
|
||||||
chunkSize += data.length;
|
chunkSize += chunkLength;
|
||||||
|
calculateChunkSize(chunkLength);
|
||||||
if (chunkSize >= maxBatchSize || chunkCount >= maxBatchCount) {
|
if (chunkSize >= maxBatchSize || chunkCount >= maxBatchCount) {
|
||||||
if (isAsync) {
|
if (isAsync) {
|
||||||
if (dataPutList.size() > 0) {
|
if (dataPutList.size() > 0) {
|
||||||
List<CompletableFuture<Object>> futures = asyncTable.batch(dataPutList);
|
List<CompletableFuture<Object>> futures = asyncTable.batch(dataPutList);
|
||||||
sendHBaseCounter.inc();
|
sinkRequestsCounter.inc();
|
||||||
CompletableFuture.supplyAsync(() -> {
|
CompletableFuture.supplyAsync(() -> {
|
||||||
for (CompletableFuture<Object> completableFuture : futures) {
|
for (CompletableFuture<Object> completableFuture : futures) {
|
||||||
completableFuture.whenCompleteAsync((result, error) -> {
|
completableFuture.whenCompleteAsync((result, error) -> {
|
||||||
if (error != null) {
|
if (error != null) {
|
||||||
LOG.error("put chunk to hbase error. ", error.getMessage());
|
LOG.error("put chunk to hbase error. ", error.getMessage());
|
||||||
sendHBaseErrorCounter.inc();
|
sinkErrorRequestsCounter.inc();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@@ -154,44 +175,44 @@ public class HBaseSink extends RichSinkFunction<FileChunk> {
|
|||||||
}
|
}
|
||||||
if (indexTimePutList.size() > 0) {
|
if (indexTimePutList.size() > 0) {
|
||||||
asyncIndexTimeTable.batch(indexTimePutList);
|
asyncIndexTimeTable.batch(indexTimePutList);
|
||||||
sendHBaseCounter.inc();
|
sinkRequestsCounter.inc();
|
||||||
indexTimePutList.clear();
|
indexTimePutList.clear();
|
||||||
}
|
}
|
||||||
if (indexFilenamePutList.size() > 0) {
|
if (indexFilenamePutList.size() > 0) {
|
||||||
asyncIndexFilenameTable.batch(indexFilenamePutList);
|
asyncIndexFilenameTable.batch(indexFilenamePutList);
|
||||||
sendHBaseCounter.inc();
|
sinkRequestsCounter.inc();
|
||||||
indexFilenamePutList.clear();
|
indexFilenamePutList.clear();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (dataPutList.size() > 0) {
|
if (dataPutList.size() > 0) {
|
||||||
try {
|
try {
|
||||||
sendHBaseCounter.inc();
|
sinkRequestsCounter.inc();
|
||||||
table.batch(dataPutList, null);
|
table.batch(dataPutList, null);
|
||||||
} catch (IOException | InterruptedException e) {
|
} catch (IOException | InterruptedException e) {
|
||||||
LOG.error("put chunk to hbase data table error. ", e.getMessage());
|
LOG.error("put chunk to hbase data table error. ", e.getMessage());
|
||||||
sendHBaseErrorCounter.inc();
|
sinkErrorRequestsCounter.inc();
|
||||||
} finally {
|
} finally {
|
||||||
dataPutList.clear();
|
dataPutList.clear();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (indexTimePutList.size() > 0) {
|
if (indexTimePutList.size() > 0) {
|
||||||
try {
|
try {
|
||||||
sendHBaseCounter.inc();
|
sinkRequestsCounter.inc();
|
||||||
indexTimeTable.batch(indexTimePutList, null);
|
indexTimeTable.batch(indexTimePutList, null);
|
||||||
} catch (IOException | InterruptedException e) {
|
} catch (IOException | InterruptedException e) {
|
||||||
LOG.error("put chunk to hbase index time table error. ", e.getMessage());
|
LOG.error("put chunk to hbase index time table error. ", e.getMessage());
|
||||||
sendHBaseErrorCounter.inc();
|
sinkErrorRequestsCounter.inc();
|
||||||
} finally {
|
} finally {
|
||||||
indexTimePutList.clear();
|
indexTimePutList.clear();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (indexFilenamePutList.size() > 0) {
|
if (indexFilenamePutList.size() > 0) {
|
||||||
try {
|
try {
|
||||||
sendHBaseCounter.inc();
|
sinkRequestsCounter.inc();
|
||||||
indexFilenameTable.batch(indexFilenamePutList, null);
|
indexFilenameTable.batch(indexFilenamePutList, null);
|
||||||
} catch (IOException | InterruptedException e) {
|
} catch (IOException | InterruptedException e) {
|
||||||
LOG.error("put chunk to hbase index filename table error. ", e.getMessage());
|
LOG.error("put chunk to hbase index filename table error. ", e.getMessage());
|
||||||
sendHBaseErrorCounter.inc();
|
sinkErrorRequestsCounter.inc();
|
||||||
} finally {
|
} finally {
|
||||||
indexFilenamePutList.clear();
|
indexFilenamePutList.clear();
|
||||||
}
|
}
|
||||||
@@ -211,4 +232,20 @@ public class HBaseSink extends RichSinkFunction<FileChunk> {
|
|||||||
IoUtil.close(syncHBaseConnection);
|
IoUtil.close(syncHBaseConnection);
|
||||||
IoUtil.close(AsyncHBaseConnection);
|
IoUtil.close(AsyncHBaseConnection);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void calculateChunkSize(long length) {
|
||||||
|
if (length <= 5 * 1024) {
|
||||||
|
lessThan5KBChunksCounter.inc();
|
||||||
|
} else if (length <= 10 * 1024) {
|
||||||
|
between5KBAnd10KBChunksCounter.inc();
|
||||||
|
} else if (length <= 50 * 1024) {
|
||||||
|
between10KBAnd50KBChunksCounter.inc();
|
||||||
|
} else if (length <= 100 * 1024) {
|
||||||
|
between50KBAnd100KBChunksCounter.inc();
|
||||||
|
} else if (length <= 1024 * 1024) {
|
||||||
|
between100KBAnd1MBChunksCounter.inc();
|
||||||
|
} else {
|
||||||
|
greaterThan1MBChunksCounter.inc();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -36,10 +36,16 @@ public class HosSink extends RichSinkFunction<FileChunk> {
|
|||||||
private static final Log LOG = LogFactory.get();
|
private static final Log LOG = LogFactory.get();
|
||||||
|
|
||||||
private final Configuration configuration;
|
private final Configuration configuration;
|
||||||
public transient Counter sendHosCounter;
|
public transient Counter sinkRequestsCounter;
|
||||||
public transient Counter sendHosErrorCounter;
|
public transient Counter sinkErrorRequestsCounter;
|
||||||
public transient Counter sendHosFileCounter;
|
public transient Counter sinkFilesCounter;
|
||||||
public transient Counter sendHosChunkCounter;
|
public transient Counter sinkChunksCounter;
|
||||||
|
public transient Counter lessThan5KBChunksCounter;
|
||||||
|
public transient Counter between5KBAnd10KBChunksCounter;
|
||||||
|
public transient Counter between10KBAnd50KBChunksCounter;
|
||||||
|
public transient Counter between50KBAnd100KBChunksCounter;
|
||||||
|
public transient Counter between100KBAnd1MBChunksCounter;
|
||||||
|
public transient Counter greaterThan1MBChunksCounter;
|
||||||
private boolean isAsync;
|
private boolean isAsync;
|
||||||
private CloseableHttpClient syncHttpClient;
|
private CloseableHttpClient syncHttpClient;
|
||||||
private CloseableHttpAsyncClient asyncHttpClient;
|
private CloseableHttpAsyncClient asyncHttpClient;
|
||||||
@@ -66,14 +72,27 @@ public class HosSink extends RichSinkFunction<FileChunk> {
|
|||||||
public void open(Configuration parameters) throws Exception {
|
public void open(Configuration parameters) throws Exception {
|
||||||
super.open(parameters);
|
super.open(parameters);
|
||||||
MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
|
MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
|
||||||
sendHosCounter = metricGroup.counter("sendHosCount");
|
lessThan5KBChunksCounter = metricGroup.counter("lessThan5KBChunksCount");
|
||||||
sendHosErrorCounter = metricGroup.counter("sendHosErrorCount");
|
between5KBAnd10KBChunksCounter = metricGroup.counter("between5KBAnd10KBChunksCount");
|
||||||
sendHosFileCounter = metricGroup.counter("sendHosFileCount");
|
between10KBAnd50KBChunksCounter = metricGroup.counter("between10KBAnd50KBChunksCount");
|
||||||
sendHosChunkCounter = metricGroup.counter("sendHosChunkCount");
|
between50KBAnd100KBChunksCounter = metricGroup.counter("between50KBAnd100KBChunksCount");
|
||||||
metricGroup.meter("sendHosPerSecond", new MeterView(sendHosCounter, 5));
|
between100KBAnd1MBChunksCounter = metricGroup.counter("between100KBAnd1MBChunksCount");
|
||||||
metricGroup.meter("sendHosErrorPerSecond", new MeterView(sendHosErrorCounter));
|
greaterThan1MBChunksCounter = metricGroup.counter("greaterThan1MBChunksCount");
|
||||||
metricGroup.meter("sendHosFilePerSecond", new MeterView(sendHosFileCounter));
|
metricGroup.meter("numLessThan5KBChunksOutPerSecond", new MeterView(lessThan5KBChunksCounter));
|
||||||
metricGroup.meter("sendHosChunkPerSecond", new MeterView(sendHosChunkCounter));
|
metricGroup.meter("numBetween5KBAnd10KBChunksOutPerSecond", new MeterView(between5KBAnd10KBChunksCounter));
|
||||||
|
metricGroup.meter("numBetween10KBAnd50KBChunksOutPerSecond", new MeterView(between10KBAnd50KBChunksCounter));
|
||||||
|
metricGroup.meter("numBetween50KBAnd100KBChunkPsOuterSecond", new MeterView(between50KBAnd100KBChunksCounter));
|
||||||
|
metricGroup.meter("numBetween100KBAnd1MBChunksOutPerSecond", new MeterView(between100KBAnd1MBChunksCounter));
|
||||||
|
metricGroup.meter("numGreaterThan1MBChunksOutPerSecond", new MeterView(greaterThan1MBChunksCounter));
|
||||||
|
sinkRequestsCounter = metricGroup.counter("sinkRequestsCount");
|
||||||
|
sinkErrorRequestsCounter = metricGroup.counter("sinkErrorRequestsCount");
|
||||||
|
sinkFilesCounter = metricGroup.counter("sinkFilesCount");
|
||||||
|
sinkChunksCounter = metricGroup.counter("sinkChunksCount");
|
||||||
|
metricGroup.meter("numRequestsSinkPerSecond", new MeterView(sinkRequestsCounter, 5));
|
||||||
|
metricGroup.meter("numErrorRequestsSinkPerSecond", new MeterView(sinkErrorRequestsCounter));
|
||||||
|
metricGroup.meter("numFilesSinkPerSecond", new MeterView(sinkFilesCounter));
|
||||||
|
metricGroup.meter("numChunksSinkPerSecond", new MeterView(sinkChunksCounter));
|
||||||
|
|
||||||
loadBalanceMode = configuration.getInteger(Configs.SINK_HOS_LOAD_BALANCE_MODE);
|
loadBalanceMode = configuration.getInteger(Configs.SINK_HOS_LOAD_BALANCE_MODE);
|
||||||
if (loadBalanceMode == 0) {
|
if (loadBalanceMode == 0) {
|
||||||
endpoint = configuration.getString(Configs.SINK_HOS_ENDPOINT);
|
endpoint = configuration.getString(Configs.SINK_HOS_ENDPOINT);
|
||||||
@@ -106,7 +125,8 @@ public class HosSink extends RichSinkFunction<FileChunk> {
|
|||||||
if (fileChunk.getChunk() != null) {
|
if (fileChunk.getChunk() != null) {
|
||||||
data = fileChunk.getChunk();
|
data = fileChunk.getChunk();
|
||||||
}
|
}
|
||||||
sendHosChunkCounter.inc();
|
long chunkLength = data.length;
|
||||||
|
sinkChunksCounter.inc();
|
||||||
if (configuration.get(Configs.SINK_BATCH)) {
|
if (configuration.get(Configs.SINK_BATCH)) {
|
||||||
hosMessage.put(HOS_META_FILE_TYPE, fileChunk.getFileType());
|
hosMessage.put(HOS_META_FILE_TYPE, fileChunk.getFileType());
|
||||||
hosMessage.put(HOS_META_FILENAME, fileChunk.getUuid());
|
hosMessage.put(HOS_META_FILENAME, fileChunk.getUuid());
|
||||||
@@ -114,7 +134,7 @@ public class HosSink extends RichSinkFunction<FileChunk> {
|
|||||||
hosMessage.put(HOS_OFFSET, fileChunk.getOffset() + "");
|
hosMessage.put(HOS_OFFSET, fileChunk.getOffset() + "");
|
||||||
hosMessage.put(HOS_PART_LAST_FLAG, fileChunk.getLastChunkFlag() + "");
|
hosMessage.put(HOS_PART_LAST_FLAG, fileChunk.getLastChunkFlag() + "");
|
||||||
if (fileChunk.getOffset() == 0) {
|
if (fileChunk.getOffset() == 0) {
|
||||||
sendHosFileCounter.inc();
|
sinkFilesCounter.inc();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
hosMessage.put(HOS_PART_NUMBER, fileChunk.getTimestamp() + "");
|
hosMessage.put(HOS_PART_NUMBER, fileChunk.getTimestamp() + "");
|
||||||
@@ -129,10 +149,11 @@ public class HosSink extends RichSinkFunction<FileChunk> {
|
|||||||
}
|
}
|
||||||
objectsMeta += hosMessage.toString() + ";";
|
objectsMeta += hosMessage.toString() + ";";
|
||||||
hosMessage.clear();
|
hosMessage.clear();
|
||||||
objectsOffset += data.length + ";";
|
objectsOffset += chunkLength + ";";
|
||||||
byteList.add(data);
|
byteList.add(data);
|
||||||
chunkCount++;
|
chunkCount++;
|
||||||
chunkSize += data.length;
|
chunkSize += chunkLength;
|
||||||
|
calculateChunkSize(chunkLength);
|
||||||
if (chunkSize >= maxBatchSize || chunkCount >= maxBatchCount) {
|
if (chunkSize >= maxBatchSize || chunkCount >= maxBatchCount) {
|
||||||
HttpPut httpPut = new HttpPut(bathPutUrl);
|
HttpPut httpPut = new HttpPut(bathPutUrl);
|
||||||
httpPut.setHeader(TOKEN, token);
|
httpPut.setHeader(TOKEN, token);
|
||||||
@@ -170,7 +191,7 @@ public class HosSink extends RichSinkFunction<FileChunk> {
|
|||||||
httpPut.setHeader(HOS_OFFSET, fileChunk.getOffset() + "");
|
httpPut.setHeader(HOS_OFFSET, fileChunk.getOffset() + "");
|
||||||
httpPut.setHeader(HOS_PART_LAST_FLAG, fileChunk.getLastChunkFlag() + "");
|
httpPut.setHeader(HOS_PART_LAST_FLAG, fileChunk.getLastChunkFlag() + "");
|
||||||
if (fileChunk.getOffset() == 0) {
|
if (fileChunk.getOffset() == 0) {
|
||||||
sendHosFileCounter.inc();
|
sinkFilesCounter.inc();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
httpPut.setHeader(HOS_PART_NUMBER, fileChunk.getTimestamp() + "");
|
httpPut.setHeader(HOS_PART_NUMBER, fileChunk.getTimestamp() + "");
|
||||||
@@ -184,6 +205,7 @@ public class HosSink extends RichSinkFunction<FileChunk> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
httpPut.setEntity(new ByteArrayEntity(data));
|
httpPut.setEntity(new ByteArrayEntity(data));
|
||||||
|
calculateChunkSize(chunkLength);
|
||||||
executeRequest(httpPut);
|
executeRequest(httpPut);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -195,7 +217,7 @@ public class HosSink extends RichSinkFunction<FileChunk> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void executeRequest(HttpPut httpPut) {
|
private void executeRequest(HttpPut httpPut) {
|
||||||
sendHosCounter.inc();
|
sinkRequestsCounter.inc();
|
||||||
if (isAsync) {
|
if (isAsync) {
|
||||||
asyncHttpClient.execute(httpPut, new FutureCallback<HttpResponse>() {
|
asyncHttpClient.execute(httpPut, new FutureCallback<HttpResponse>() {
|
||||||
@Override
|
@Override
|
||||||
@@ -204,18 +226,18 @@ public class HosSink extends RichSinkFunction<FileChunk> {
|
|||||||
if (httpResponse.getStatusLine().getStatusCode() != 200) {
|
if (httpResponse.getStatusLine().getStatusCode() != 200) {
|
||||||
String responseEntity = EntityUtils.toString(httpResponse.getEntity(), CharEncoding.UTF_8);
|
String responseEntity = EntityUtils.toString(httpResponse.getEntity(), CharEncoding.UTF_8);
|
||||||
LOG.error("put part to hos error. code: " + httpResponse.getStatusLine().getStatusCode() + ". message: " + responseEntity);
|
LOG.error("put part to hos error. code: " + httpResponse.getStatusLine().getStatusCode() + ". message: " + responseEntity);
|
||||||
sendHosErrorCounter.inc();
|
sinkErrorRequestsCounter.inc();
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.error("put part to hos error.", e);
|
LOG.error("put part to hos error.", e);
|
||||||
sendHosErrorCounter.inc();
|
sinkErrorRequestsCounter.inc();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void failed(Exception ex) {
|
public void failed(Exception ex) {
|
||||||
LOG.error("put part to hos error.", ex);
|
LOG.error("put part to hos error.", ex);
|
||||||
sendHosErrorCounter.inc();
|
sinkErrorRequestsCounter.inc();
|
||||||
if (loadBalanceMode == 1 && ex instanceof ConnectException) {
|
if (loadBalanceMode == 1 && ex instanceof ConnectException) {
|
||||||
endpoint = ipList.get(RandomUtil.randomInt(ipList.size())) + ":" + portList.get(RandomUtil.randomInt(portList.size()));
|
endpoint = ipList.get(RandomUtil.randomInt(ipList.size())) + ":" + portList.get(RandomUtil.randomInt(portList.size()));
|
||||||
bathPutUrl = URLUtil.normalize(endpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + PublicUtil.getUUID()) + "?multiFile";
|
bathPutUrl = URLUtil.normalize(endpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + PublicUtil.getUUID()) + "?multiFile";
|
||||||
@@ -234,11 +256,11 @@ public class HosSink extends RichSinkFunction<FileChunk> {
|
|||||||
if (response.getStatusLine().getStatusCode() != 200) {
|
if (response.getStatusLine().getStatusCode() != 200) {
|
||||||
String responseEntity = EntityUtils.toString(response.getEntity(), CharEncoding.UTF_8);
|
String responseEntity = EntityUtils.toString(response.getEntity(), CharEncoding.UTF_8);
|
||||||
LOG.error("put part to hos error. code: " + response.getStatusLine().getStatusCode() + ". message: " + responseEntity);
|
LOG.error("put part to hos error. code: " + response.getStatusLine().getStatusCode() + ". message: " + responseEntity);
|
||||||
sendHosErrorCounter.inc();
|
sinkErrorRequestsCounter.inc();
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.error("put part to hos error.", e);
|
LOG.error("put part to hos error.", e);
|
||||||
sendHosErrorCounter.inc();
|
sinkErrorRequestsCounter.inc();
|
||||||
if (loadBalanceMode == 1 && (e instanceof HttpHostConnectException || e instanceof ConnectTimeoutException)) {
|
if (loadBalanceMode == 1 && (e instanceof HttpHostConnectException || e instanceof ConnectTimeoutException)) {
|
||||||
endpoint = ipList.get(RandomUtil.randomInt(ipList.size())) + ":" + portList.get(RandomUtil.randomInt(portList.size()));
|
endpoint = ipList.get(RandomUtil.randomInt(ipList.size())) + ":" + portList.get(RandomUtil.randomInt(portList.size()));
|
||||||
}
|
}
|
||||||
@@ -247,4 +269,20 @@ public class HosSink extends RichSinkFunction<FileChunk> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void calculateChunkSize(long length) {
|
||||||
|
if (length <= 5 * 1024) {
|
||||||
|
lessThan5KBChunksCounter.inc();
|
||||||
|
} else if (length <= 10 * 1024) {
|
||||||
|
between5KBAnd10KBChunksCounter.inc();
|
||||||
|
} else if (length <= 50 * 1024) {
|
||||||
|
between10KBAnd50KBChunksCounter.inc();
|
||||||
|
} else if (length <= 100 * 1024) {
|
||||||
|
between50KBAnd100KBChunksCounter.inc();
|
||||||
|
} else if (length <= 1024 * 1024) {
|
||||||
|
between100KBAnd1MBChunksCounter.inc();
|
||||||
|
} else {
|
||||||
|
greaterThan1MBChunksCounter.inc();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -53,6 +53,8 @@ import java.time.Duration;
|
|||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
|
|
||||||
|
import static com.zdjizhi.utils.PublicConstants.COMBINE_MODE_APPEND;
|
||||||
|
|
||||||
public class FileChunkCombinerTests {
|
public class FileChunkCombinerTests {
|
||||||
private File emlFile;
|
private File emlFile;
|
||||||
private byte[] emlFileBytes;
|
private byte[] emlFileBytes;
|
||||||
@@ -106,7 +108,8 @@ public class FileChunkCombinerTests {
|
|||||||
triggers.add(LastChunkOrNoDataInTimeTrigger.of(1000));
|
triggers.add(LastChunkOrNoDataInTimeTrigger.of(1000));
|
||||||
Trigger<Object, TimeWindow> trigger = MultipleTrigger.of(triggers);
|
Trigger<Object, TimeWindow> trigger = MultipleTrigger.of(triggers);
|
||||||
processWindowFunction = new CombineChunkProcessWindowFunction(Integer.MAX_VALUE);
|
processWindowFunction = new CombineChunkProcessWindowFunction(Integer.MAX_VALUE);
|
||||||
delayedChunkOutputTag = new OutputTag<>("delayed-chunk") {};
|
delayedChunkOutputTag = new OutputTag<FileChunk>("delayed-chunk") {
|
||||||
|
};
|
||||||
DataStreamSource<FileChunk> source = env.fromCollection(inputFileChunks);
|
DataStreamSource<FileChunk> source = env.fromCollection(inputFileChunks);
|
||||||
DataStream<FileChunk> window = source
|
DataStream<FileChunk> window = source
|
||||||
.keyBy(new FileChunkKeySelector())
|
.keyBy(new FileChunkKeySelector())
|
||||||
@@ -122,7 +125,7 @@ public class FileChunkCombinerTests {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testParseMessagePackMapFunction() throws Exception {
|
public void testParseMessagePackMapFunction() throws Exception {
|
||||||
ParseMessagePackMapFunction mapFunction = new ParseMessagePackMapFunction(false, Long.MAX_VALUE,"");
|
ParseMessagePackMapFunction mapFunction = new ParseMessagePackMapFunction(false, Long.MAX_VALUE, "");
|
||||||
OneInputStreamOperatorTestHarness<byte[], FileChunk> testHarness = new OneInputStreamOperatorTestHarness<>(new StreamMap<>(mapFunction));
|
OneInputStreamOperatorTestHarness<byte[], FileChunk> testHarness = new OneInputStreamOperatorTestHarness<>(new StreamMap<>(mapFunction));
|
||||||
testHarness.setup();
|
testHarness.setup();
|
||||||
testHarness.open();
|
testHarness.open();
|
||||||
@@ -140,9 +143,49 @@ public class FileChunkCombinerTests {
|
|||||||
StreamRecord sr1 = (StreamRecord) o2;
|
StreamRecord sr1 = (StreamRecord) o2;
|
||||||
return ((FileChunk) sr0.getValue()).getUuid().compareTo(((FileChunk) sr1.getValue()).getUuid());
|
return ((FileChunk) sr0.getValue()).getUuid().compareTo(((FileChunk) sr1.getValue()).getUuid());
|
||||||
});
|
});
|
||||||
Assert.assertEquals(30, mapFunction.parseMessagePackCounter.getCount());
|
Assert.assertEquals(30, mapFunction.parseMessagePacksCounter.getCount());
|
||||||
Assert.assertEquals(0, mapFunction.parseMessagePackErrorCounter.getCount());
|
Assert.assertEquals(0, mapFunction.parseErrorMessagePacksCounter.getCount());
|
||||||
Assert.assertEquals(0, mapFunction.rateLimitDropCounter.getCount());
|
Assert.assertEquals(0, mapFunction.rateLimitDropChunksCounter.getCount());
|
||||||
|
Assert.assertEquals(21, mapFunction.equal0BChunksCounter.getCount());
|
||||||
|
Assert.assertEquals(1, mapFunction.lessThan1KBChunksCounter.getCount());
|
||||||
|
Assert.assertEquals(8, mapFunction.between1KBAnd5KBChunksCounter.getCount());
|
||||||
|
Assert.assertEquals(0, mapFunction.between5KBAnd10KBChunksCounter.getCount());
|
||||||
|
Assert.assertEquals(0, mapFunction.between10KBAnd50KBChunksCounter.getCount());
|
||||||
|
Assert.assertEquals(0, mapFunction.between50KBAnd100KBChunksCounter.getCount());
|
||||||
|
Assert.assertEquals(0, mapFunction.greaterThan100KBChunksCounter.getCount());
|
||||||
|
Assert.assertEquals(10, mapFunction.emlChunksCounter.getCount());
|
||||||
|
Assert.assertEquals(20, mapFunction.pcapngChunksCounter.getCount());
|
||||||
|
Assert.assertEquals(0, mapFunction.txtChunksCounter.getCount());
|
||||||
|
Assert.assertEquals(0, mapFunction.htmlChunksCounter.getCount());
|
||||||
|
Assert.assertEquals(0, mapFunction.mediaChunksCounter.getCount());
|
||||||
|
testHarness.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSideOutputMapFunction() throws Exception {
|
||||||
|
SideOutputMapFunction sideOutputMapFunction = new SideOutputMapFunction();
|
||||||
|
OneInputStreamOperatorTestHarness<FileChunk, FileChunk> testHarness = new OneInputStreamOperatorTestHarness<>(new StreamMap<>(sideOutputMapFunction));
|
||||||
|
testHarness.setup();
|
||||||
|
testHarness.open();
|
||||||
|
for (FileChunk fileChunk : inputFileChunks) {
|
||||||
|
testHarness.processElement(new StreamRecord<>(fileChunk));
|
||||||
|
}
|
||||||
|
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
|
||||||
|
for (FileChunk fileChunk : inputFileChunks) {
|
||||||
|
fileChunk.setChunkCount(1);
|
||||||
|
if (COMBINE_MODE_APPEND.equals(fileChunk.getCombineMode())) {
|
||||||
|
fileChunk.setChunkNumbers(fileChunk.getTimestamp() + "-" + fileChunk.getChunk().length + ";");
|
||||||
|
}
|
||||||
|
expectedOutput.add(new StreamRecord<>(fileChunk));
|
||||||
|
}
|
||||||
|
ConcurrentLinkedQueue<Object> actualOutput = testHarness.getOutput();
|
||||||
|
Assert.assertEquals(30, actualOutput.size());
|
||||||
|
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, actualOutput, (o1, o2) -> {
|
||||||
|
StreamRecord sr0 = (StreamRecord) o1;
|
||||||
|
StreamRecord sr1 = (StreamRecord) o2;
|
||||||
|
return ((FileChunk) sr0.getValue()).getUuid().compareTo(((FileChunk) sr1.getValue()).getUuid());
|
||||||
|
});
|
||||||
|
Assert.assertEquals(30, sideOutputMapFunction.delayedChunksCounter.getCount());
|
||||||
testHarness.close();
|
testHarness.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -170,7 +213,7 @@ public class FileChunkCombinerTests {
|
|||||||
return ((FileChunk) sr0.getValue()).getUuid().compareTo(((FileChunk) sr1.getValue()).getUuid());
|
return ((FileChunk) sr0.getValue()).getUuid().compareTo(((FileChunk) sr1.getValue()).getUuid());
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
Assert.assertEquals(20, fileChunkFilterFunction.filterChunkCounter.getCount());
|
Assert.assertEquals(20, fileChunkFilterFunction.filterChunksCounter.getCount());
|
||||||
testHarness.close();
|
testHarness.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -189,8 +232,8 @@ public class FileChunkCombinerTests {
|
|||||||
List<Object> actualOutput = new ArrayList<>(testHarness.extractOutputValues());
|
List<Object> actualOutput = new ArrayList<>(testHarness.extractOutputValues());
|
||||||
Assert.assertEquals(3, actualOutput.size());
|
Assert.assertEquals(3, actualOutput.size());
|
||||||
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, actualOutput, Comparator.comparing(o -> ((FileChunk) o).getUuid()));
|
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, actualOutput, Comparator.comparing(o -> ((FileChunk) o).getUuid()));
|
||||||
Assert.assertEquals(0, processWindowFunction.combineErrorCounter.getCount());
|
Assert.assertEquals(0, processWindowFunction.combineErrorChunksCounter.getCount());
|
||||||
Assert.assertEquals(0, processWindowFunction.duplicateChunkCounter.getCount());
|
Assert.assertEquals(0, processWindowFunction.duplicateChunksCounter.getCount());
|
||||||
testHarness.close();
|
testHarness.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -223,8 +266,8 @@ public class FileChunkCombinerTests {
|
|||||||
}
|
}
|
||||||
Assert.assertEquals(10, sideOutput.size());
|
Assert.assertEquals(10, sideOutput.size());
|
||||||
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedSideOutput, actualSideOutput, Comparator.comparing(o -> ((FileChunk) o).getUuid()));
|
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedSideOutput, actualSideOutput, Comparator.comparing(o -> ((FileChunk) o).getUuid()));
|
||||||
Assert.assertEquals(0, processWindowFunction.combineErrorCounter.getCount());
|
Assert.assertEquals(0, processWindowFunction.combineErrorChunksCounter.getCount());
|
||||||
Assert.assertEquals(0, processWindowFunction.duplicateChunkCounter.getCount());
|
Assert.assertEquals(0, processWindowFunction.duplicateChunksCounter.getCount());
|
||||||
testHarness.close();
|
testHarness.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -251,8 +294,8 @@ public class FileChunkCombinerTests {
|
|||||||
Assert.assertEquals(inputFiles.get(0), actualOutput.get(0));
|
Assert.assertEquals(inputFiles.get(0), actualOutput.get(0));
|
||||||
Assert.assertEquals(inputFiles.get(1).getChunk().length + pcapngFileChunks.get(5).getChunk().length, actualOutput.get(1).getChunk().length);
|
Assert.assertEquals(inputFiles.get(1).getChunk().length + pcapngFileChunks.get(5).getChunk().length, actualOutput.get(1).getChunk().length);
|
||||||
Assert.assertEquals(inputFiles.get(2).getChunk().length + pcapngIncludeMetaFileChunks.get(5).getChunk().length, actualOutput.get(2).getChunk().length);
|
Assert.assertEquals(inputFiles.get(2).getChunk().length + pcapngIncludeMetaFileChunks.get(5).getChunk().length, actualOutput.get(2).getChunk().length);
|
||||||
Assert.assertEquals(0, processWindowFunction.combineErrorCounter.getCount());
|
Assert.assertEquals(0, processWindowFunction.combineErrorChunksCounter.getCount());
|
||||||
Assert.assertEquals(1, processWindowFunction.duplicateChunkCounter.getCount());
|
Assert.assertEquals(1, processWindowFunction.duplicateChunksCounter.getCount());
|
||||||
testHarness.close();
|
testHarness.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -279,8 +322,8 @@ public class FileChunkCombinerTests {
|
|||||||
Assert.assertEquals(inputFiles.get(0).getChunk().length - emlFileChunks.get(5).getChunk().length, actualOutput.get(0).getChunk().length + actualOutput.get(1).getChunk().length);
|
Assert.assertEquals(inputFiles.get(0).getChunk().length - emlFileChunks.get(5).getChunk().length, actualOutput.get(0).getChunk().length + actualOutput.get(1).getChunk().length);
|
||||||
Assert.assertEquals(inputFiles.get(1).getChunk().length - pcapngFileChunks.get(5).getChunk().length, actualOutput.get(2).getChunk().length);
|
Assert.assertEquals(inputFiles.get(1).getChunk().length - pcapngFileChunks.get(5).getChunk().length, actualOutput.get(2).getChunk().length);
|
||||||
Assert.assertEquals(inputFiles.get(2).getChunk().length - pcapngIncludeMetaFileChunks.get(5).getChunk().length, actualOutput.get(3).getChunk().length);
|
Assert.assertEquals(inputFiles.get(2).getChunk().length - pcapngIncludeMetaFileChunks.get(5).getChunk().length, actualOutput.get(3).getChunk().length);
|
||||||
Assert.assertEquals(0, processWindowFunction.combineErrorCounter.getCount());
|
Assert.assertEquals(0, processWindowFunction.combineErrorChunksCounter.getCount());
|
||||||
Assert.assertEquals(0, processWindowFunction.duplicateChunkCounter.getCount());
|
Assert.assertEquals(0, processWindowFunction.duplicateChunksCounter.getCount());
|
||||||
testHarness.close();
|
testHarness.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -298,20 +341,30 @@ public class FileChunkCombinerTests {
|
|||||||
//seek文件
|
//seek文件
|
||||||
FileChunk fileChunk = new FileChunk("0000000001", "eml", 0, data.length, data, "seek", 1, 5, System.currentTimeMillis());
|
FileChunk fileChunk = new FileChunk("0000000001", "eml", 0, data.length, data, "seek", 1, 5, System.currentTimeMillis());
|
||||||
testHarness.processElement(new StreamRecord<>(fileChunk));
|
testHarness.processElement(new StreamRecord<>(fileChunk));
|
||||||
Assert.assertEquals(1, hosSink.sendHosCounter.getCount());
|
Assert.assertEquals(1, hosSink.sinkRequestsCounter.getCount());
|
||||||
Assert.assertEquals(0, hosSink.sendHosErrorCounter.getCount());
|
Assert.assertEquals(0, hosSink.sinkErrorRequestsCounter.getCount());
|
||||||
Assert.assertEquals(1, hosSink.sendHosFileCounter.getCount());
|
Assert.assertEquals(1, hosSink.sinkFilesCounter.getCount());
|
||||||
Assert.assertEquals(1, hosSink.sendHosChunkCounter.getCount());
|
Assert.assertEquals(1, hosSink.sinkChunksCounter.getCount());
|
||||||
//append文件
|
//append文件
|
||||||
fileChunk = new FileChunk("0000000002", "pcapng", data.length, data, "append", 5, System.currentTimeMillis(), pcapngFileMeta, "1-200,2-200,3-200,4-200,5-200");
|
fileChunk = new FileChunk("0000000002", "pcapng", data.length, data, "append", 5, System.currentTimeMillis(), pcapngFileMeta, "1-200,2-200,3-200,4-200,5-200");
|
||||||
testHarness.processElement(new StreamRecord<>(fileChunk));
|
testHarness.processElement(new StreamRecord<>(fileChunk));
|
||||||
Assert.assertEquals(2, hosSink.sendHosCounter.getCount());
|
Assert.assertEquals(2, hosSink.sinkRequestsCounter.getCount());
|
||||||
Assert.assertEquals(0, hosSink.sendHosErrorCounter.getCount());
|
Assert.assertEquals(0, hosSink.sinkErrorRequestsCounter.getCount());
|
||||||
Assert.assertEquals(1, hosSink.sendHosFileCounter.getCount());
|
Assert.assertEquals(1, hosSink.sinkFilesCounter.getCount());
|
||||||
Assert.assertEquals(2, hosSink.sendHosChunkCounter.getCount());
|
Assert.assertEquals(2, hosSink.sinkChunksCounter.getCount());
|
||||||
testHarness.close();
|
Assert.assertEquals(2, hosSink.sinkChunksCounter.getCount());
|
||||||
|
Assert.assertEquals(2, hosSink.sinkChunksCounter.getCount());
|
||||||
|
Assert.assertEquals(2, hosSink.sinkChunksCounter.getCount());
|
||||||
|
Assert.assertEquals(2, hosSink.lessThan5KBChunksCounter.getCount());
|
||||||
|
Assert.assertEquals(0, hosSink.between5KBAnd10KBChunksCounter.getCount());
|
||||||
|
Assert.assertEquals(0, hosSink.between10KBAnd50KBChunksCounter.getCount());
|
||||||
|
Assert.assertEquals(0, hosSink.between50KBAnd100KBChunksCounter.getCount());
|
||||||
|
Assert.assertEquals(0, hosSink.between100KBAnd1MBChunksCounter.getCount());
|
||||||
|
Assert.assertEquals(0, hosSink.greaterThan1MBChunksCounter.getCount());
|
||||||
|
|
||||||
|
testHarness.close();
|
||||||
//测试批量上传
|
//测试批量上传
|
||||||
|
data = RandomUtil.randomString(10000).getBytes();
|
||||||
configuration.setString(Configs.SINK_TYPE, "hos");
|
configuration.setString(Configs.SINK_TYPE, "hos");
|
||||||
configuration.setBoolean(Configs.SINK_BATCH, true);
|
configuration.setBoolean(Configs.SINK_BATCH, true);
|
||||||
configuration.setInteger(Configs.SINK_BATCH_COUNT, 2);
|
configuration.setInteger(Configs.SINK_BATCH_COUNT, 2);
|
||||||
@@ -324,10 +377,16 @@ public class FileChunkCombinerTests {
|
|||||||
testHarness.processElement(new StreamRecord<>(fileChunk));
|
testHarness.processElement(new StreamRecord<>(fileChunk));
|
||||||
fileChunk = new FileChunk("0000000002", "eml", 0, data.length, data, "seek", 1, 5, System.currentTimeMillis());
|
fileChunk = new FileChunk("0000000002", "eml", 0, data.length, data, "seek", 1, 5, System.currentTimeMillis());
|
||||||
testHarness.processElement(new StreamRecord<>(fileChunk));
|
testHarness.processElement(new StreamRecord<>(fileChunk));
|
||||||
Assert.assertEquals(1, hosSink.sendHosCounter.getCount());
|
Assert.assertEquals(1, hosSink.sinkRequestsCounter.getCount());
|
||||||
Assert.assertEquals(0, hosSink.sendHosErrorCounter.getCount());
|
Assert.assertEquals(0, hosSink.sinkErrorRequestsCounter.getCount());
|
||||||
Assert.assertEquals(2, hosSink.sendHosFileCounter.getCount());
|
Assert.assertEquals(2, hosSink.sinkFilesCounter.getCount());
|
||||||
Assert.assertEquals(2, hosSink.sendHosChunkCounter.getCount());
|
Assert.assertEquals(2, hosSink.sinkChunksCounter.getCount());
|
||||||
|
Assert.assertEquals(0, hosSink.lessThan5KBChunksCounter.getCount());
|
||||||
|
Assert.assertEquals(2, hosSink.between5KBAnd10KBChunksCounter.getCount());
|
||||||
|
Assert.assertEquals(0, hosSink.between10KBAnd50KBChunksCounter.getCount());
|
||||||
|
Assert.assertEquals(0, hosSink.between50KBAnd100KBChunksCounter.getCount());
|
||||||
|
Assert.assertEquals(0, hosSink.between100KBAnd1MBChunksCounter.getCount());
|
||||||
|
Assert.assertEquals(0, hosSink.greaterThan1MBChunksCounter.getCount());
|
||||||
testHarness.close();
|
testHarness.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -346,10 +405,16 @@ public class FileChunkCombinerTests {
|
|||||||
testHarness.processElement(new StreamRecord<>(fileChunk));
|
testHarness.processElement(new StreamRecord<>(fileChunk));
|
||||||
fileChunk = new FileChunk("0000000002", "eml", 0, data.length, data, "seek", 1, 5, System.currentTimeMillis());
|
fileChunk = new FileChunk("0000000002", "eml", 0, data.length, data, "seek", 1, 5, System.currentTimeMillis());
|
||||||
testHarness.processElement(new StreamRecord<>(fileChunk));
|
testHarness.processElement(new StreamRecord<>(fileChunk));
|
||||||
Assert.assertEquals("上传文件到hbase错误", 0, hBaseSink.sendHBaseErrorCounter.getCount());
|
Assert.assertEquals(3, hBaseSink.sinkRequestsCounter.getCount());
|
||||||
Assert.assertEquals("上传文件到hbase次数错误", 3, hBaseSink.sendHBaseCounter.getCount());
|
Assert.assertEquals(0, hBaseSink.sinkErrorRequestsCounter.getCount());
|
||||||
Assert.assertEquals(2, hBaseSink.sendHBaseFileCounter.getCount());
|
Assert.assertEquals(2, hBaseSink.sinkFilesCounter.getCount());
|
||||||
Assert.assertEquals(2, hBaseSink.sendHBaseChunkCounter.getCount());
|
Assert.assertEquals(2, hBaseSink.sinkChunksCounter.getCount());
|
||||||
|
Assert.assertEquals(2, hBaseSink.lessThan5KBChunksCounter.getCount());
|
||||||
|
Assert.assertEquals(0, hBaseSink.between5KBAnd10KBChunksCounter.getCount());
|
||||||
|
Assert.assertEquals(0, hBaseSink.between10KBAnd50KBChunksCounter.getCount());
|
||||||
|
Assert.assertEquals(0, hBaseSink.between50KBAnd100KBChunksCounter.getCount());
|
||||||
|
Assert.assertEquals(0, hBaseSink.between100KBAnd1MBChunksCounter.getCount());
|
||||||
|
Assert.assertEquals(0, hBaseSink.greaterThan1MBChunksCounter.getCount());
|
||||||
testHarness.close();
|
testHarness.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -617,7 +682,7 @@ public class FileChunkCombinerTests {
|
|||||||
triggers.add(LastChunkOrNoDataInTimeTrigger.of(windowIdleTime * 1000));
|
triggers.add(LastChunkOrNoDataInTimeTrigger.of(windowIdleTime * 1000));
|
||||||
Trigger<Object, TimeWindow> trigger = MultipleTrigger.of(triggers);
|
Trigger<Object, TimeWindow> trigger = MultipleTrigger.of(triggers);
|
||||||
env.addSource(source)
|
env.addSource(source)
|
||||||
.map(new ParseMessagePackMapFunction(false, Long.MAX_VALUE,""))
|
.map(new ParseMessagePackMapFunction(false, Long.MAX_VALUE, ""))
|
||||||
.filter(new FileChunkFilterFunction(Long.MAX_VALUE, ""))
|
.filter(new FileChunkFilterFunction(Long.MAX_VALUE, ""))
|
||||||
.assignTimestampsAndWatermarks(watermarkStrategy)
|
.assignTimestampsAndWatermarks(watermarkStrategy)
|
||||||
.keyBy(new FileChunkKeySelector(), BasicTypeInfo.STRING_TYPE_INFO)
|
.keyBy(new FileChunkKeySelector(), BasicTypeInfo.STRING_TYPE_INFO)
|
||||||
|
|||||||
Reference in New Issue
Block a user