diff --git a/druid-hdrhistogram/src/main/java/org/HdrHistogram/ArrayHistogram.java b/druid-hdrhistogram/src/main/java/org/HdrHistogram/ArrayHistogram.java index 86f4c95..cea72d9 100644 --- a/druid-hdrhistogram/src/main/java/org/HdrHistogram/ArrayHistogram.java +++ b/druid-hdrhistogram/src/main/java/org/HdrHistogram/ArrayHistogram.java @@ -1,361 +1,388 @@ -package org.HdrHistogram; /** - * Written by Gil Tene of Azul Systems, and released to the public domain, - * as explained at http://creativecommons.org/publicdomain/zero/1.0/ - * - * @author Gil Tene - */ - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.zip.DataFormatException; - -/** - *

A High Dynamic Range (HDR) Histogram

- *

- * {@link ArrayHistogram} supports the recording and analyzing sampled data value counts across a configurable integer value - * range with configurable value precision within the range. Value precision is expressed as the number of significant - * digits in the value recording, and provides control over value quantization behavior across the value range and the - * subsequent value resolution at any given level. - *

- * For example, a Histogram could be configured to track the counts of observed integer values between 0 and - * 3,600,000,000 while maintaining a value precision of 3 significant digits across that range. Value quantization - * within the range will thus be no larger than 1/1,000th (or 0.1%) of any value. This example Histogram could - * be used to track and analyze the counts of observed response times ranging between 1 microsecond and 1 hour - * in magnitude, while maintaining a value resolution of 1 microsecond up to 1 millisecond, a resolution of - * 1 millisecond (or better) up to one second, and a resolution of 1 second (or better) up to 1,000 seconds. At its - * maximum tracked value (1 hour), it would still maintain a resolution of 3.6 seconds (or better). - *

- * Histogram tracks value counts in long fields. Smaller field types are available in the - * {@link IntCountsHistogram} and {@link ShortCountsHistogram} implementations of - * {@link AbstractHistogram}. - *

- * Auto-resizing: When constructed with no specified value range range (or when auto-resize is turned on with {@link - * ArrayHistogram#setAutoResize}) a {@link ArrayHistogram} will auto-resize its dynamic range to include recorded values as - * they are encountered. Note that recording calls that cause auto-resizing may take longer to execute, as resizing - * incurs allocation and copying of internal data structures. - *

- * See package description for {@link org.HdrHistogram} for details. - */ - -public class ArrayHistogram extends AbstractHistogram implements Histogramer{ - long totalCount; - long[] counts; - int normalizingIndexOffset; - - @Override - long getCountAtIndex(final int index) { - return counts[normalizeIndex(index, normalizingIndexOffset, countsArrayLength)]; - } - - @Override - long getCountAtNormalizedIndex(final int index) { - return counts[index]; - } - - @Override - void incrementCountAtIndex(final int index) { - counts[normalizeIndex(index, normalizingIndexOffset, countsArrayLength)]++; - } - - @Override - void addToCountAtIndex(final int index, final long value) { - // 正常情况下normalizingIndexOffset = 0, index不用偏移 - counts[normalizeIndex(index, normalizingIndexOffset, countsArrayLength)] += value; - } - - @Override - void setCountAtIndex(int index, long value) { - counts[normalizeIndex(index, normalizingIndexOffset, countsArrayLength)] = value; - } - - @Override - void setCountAtNormalizedIndex(int index, long value) { - counts[index] = value; - } - - @Override - int getNormalizingIndexOffset() { - return normalizingIndexOffset; - } - - @Override - void setNormalizingIndexOffset(int normalizingIndexOffset) { - this.normalizingIndexOffset = normalizingIndexOffset; - } - - @Override - void setIntegerToDoubleValueConversionRatio(double integerToDoubleValueConversionRatio) { - nonConcurrentSetIntegerToDoubleValueConversionRatio(integerToDoubleValueConversionRatio); - } - - @Override - void shiftNormalizingIndexByOffset(int offsetToAdd, - boolean lowestHalfBucketPopulated, - double newIntegerToDoubleValueConversionRatio) { - nonConcurrentNormalizingIndexShift(offsetToAdd, lowestHalfBucketPopulated); - } - - @Override - void clearCounts() { - Arrays.fill(counts, 0); - totalCount = 0; - } - - @Override - public Histogramer makeCopy() { - return miniCopy(); - } - - @Override - public ArrayHistogram copy() { - ArrayHistogram copy = new ArrayHistogram(this); - copy.add(this); - return copy; - } - - public ArrayHistogram miniCopy() { - ArrayHistogram copy = new ArrayHistogram(lowestDiscernibleValue, maxValue < highestTrackableValue ? Math.max(maxValue, lowestDiscernibleValue * 2) : highestTrackableValue, numberOfSignificantValueDigits); - copy.add(this); - return copy; - } - - @Override - public ArrayHistogram copyCorrectedForCoordinatedOmission(final long expectedIntervalBetweenValueSamples) { - ArrayHistogram copy = new ArrayHistogram(this); - copy.addWhileCorrectingForCoordinatedOmission(this, expectedIntervalBetweenValueSamples); - return copy; - } - - @Override - public long getTotalCount() { - return totalCount; - } - - @Override - void setTotalCount(final long totalCount) { - this.totalCount = totalCount; - } - - @Override - void incrementTotalCount() { - totalCount++; - } - - @Override - void addToTotalCount(final long value) { - totalCount += value; - } - - @Override - int _getEstimatedFootprintInBytes() { - return (512 + (8 * counts.length)); - } - - @Override - void resize(long newHighestTrackableValue) { - int oldNormalizedZeroIndex = normalizeIndex(0, normalizingIndexOffset, countsArrayLength); - - establishSize(newHighestTrackableValue); - - int countsDelta = countsArrayLength - counts.length; - - counts = Arrays.copyOf(counts, countsArrayLength); - - if (oldNormalizedZeroIndex != 0) { - // We need to shift the stuff from the zero index and up to the end of the array: - int newNormalizedZeroIndex = oldNormalizedZeroIndex + countsDelta; - int lengthToCopy = (countsArrayLength - countsDelta) - oldNormalizedZeroIndex; - System.arraycopy(counts, oldNormalizedZeroIndex, counts, newNormalizedZeroIndex, lengthToCopy); - Arrays.fill(counts, oldNormalizedZeroIndex, newNormalizedZeroIndex, 0); - } - } - - /** - * Construct an auto-resizing histogram with a lowest discernible value of 1 and an auto-adjusting - * highestTrackableValue. Can auto-resize up to track values up to (Long.MAX_VALUE / 2). - * - * @param numberOfSignificantValueDigits Specifies the precision to use. This is the number of significant - * decimal digits to which the histogram will maintain value resolution - * and separation. Must be a non-negative integer between 0 and 5. - */ - public ArrayHistogram(final int numberOfSignificantValueDigits) { - this(1, 2, numberOfSignificantValueDigits); - setAutoResize(true); - } - - /** - * Construct a Histogram given the Highest value to be tracked and a number of significant decimal digits. The - * histogram will be constructed to implicitly track (distinguish from 0) values as low as 1. - * - * @param highestTrackableValue The highest value to be tracked by the histogram. Must be a positive - * integer that is {@literal >=} 2. - * @param numberOfSignificantValueDigits Specifies the precision to use. This is the number of significant - * decimal digits to which the histogram will maintain value resolution - * and separation. Must be a non-negative integer between 0 and 5. - */ - public ArrayHistogram(final long highestTrackableValue, final int numberOfSignificantValueDigits) { - this(1, highestTrackableValue, numberOfSignificantValueDigits); - } - - /** - * Construct a Histogram given the Lowest and Highest values to be tracked and a number of significant - * decimal digits. Providing a lowestDiscernibleValue is useful is situations where the units used - * for the histogram's values are much smaller that the minimal accuracy required. E.g. when tracking - * time values stated in nanosecond units, where the minimal accuracy required is a microsecond, the - * proper value for lowestDiscernibleValue would be 1000. - * - * @param lowestDiscernibleValue The lowest value that can be discerned (distinguished from 0) by the - * histogram. Must be a positive integer that is {@literal >=} 1. May be - * internally rounded down to nearest power of 2. - * @param highestTrackableValue The highest value to be tracked by the histogram. Must be a positive - * integer that is {@literal >=} (2 * lowestDiscernibleValue). - * @param numberOfSignificantValueDigits Specifies the precision to use. This is the number of significant - * decimal digits to which the histogram will maintain value resolution - * and separation. Must be a non-negative integer between 0 and 5. - */ - public ArrayHistogram(final long lowestDiscernibleValue, final long highestTrackableValue, - final int numberOfSignificantValueDigits) { - this(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits, true); - } - - /** - * Construct a histogram with the same range settings as a given source histogram, - * duplicating the source's start/end timestamps (but NOT its contents) - * @param source The source histogram to duplicate - */ - public ArrayHistogram(final AbstractHistogram source) { - this(source, true); - } - - ArrayHistogram(final AbstractHistogram source, boolean allocateCountsArray) { - super(source); - if (allocateCountsArray) { - counts = new long[countsArrayLength]; - } - wordSizeInBytes = 8; - } - - ArrayHistogram(final long lowestDiscernibleValue, final long highestTrackableValue, - final int numberOfSignificantValueDigits, boolean allocateCountsArray) { - super(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits); - if (allocateCountsArray) { - counts = new long[countsArrayLength]; - } - // 写死 = 8 - wordSizeInBytes = 8; - } - - /** - * Construct a new histogram by decoding it from a ByteBuffer. - * @param buffer The buffer to decode from - * @param minBarForHighestTrackableValue Force highestTrackableValue to be set at least this high - * @return The newly constructed histogram - */ - public static ArrayHistogram decodeFromByteBuffer(final ByteBuffer buffer, - final long minBarForHighestTrackableValue) { - return decodeFromByteBuffer(buffer, ArrayHistogram.class, minBarForHighestTrackableValue); - } - - /** - * Construct a new histogram by decoding it from a compressed form in a ByteBuffer. - * @param buffer The buffer to decode from - * @param minBarForHighestTrackableValue Force highestTrackableValue to be set at least this high - * @return The newly constructed histogram - * @throws DataFormatException on error parsing/decompressing the buffer - */ - public static ArrayHistogram decodeFromCompressedByteBuffer(final ByteBuffer buffer, - final long minBarForHighestTrackableValue) - throws DataFormatException { - return decodeFromCompressedByteBuffer(buffer, ArrayHistogram.class, minBarForHighestTrackableValue); - } - - private void readObject(final ObjectInputStream o) - throws IOException, ClassNotFoundException { - o.defaultReadObject(); - } - - /** - * Construct a new Histogram by decoding it from a String containing a base64 encoded - * compressed histogram representation. - * - * @param base64CompressedHistogramString A string containing a base64 encoding of a compressed histogram - * @return A Histogream decoded from the string - * @throws DataFormatException on error parsing/decompressing the input - */ - public static ArrayHistogram fromString(final String base64CompressedHistogramString) - throws DataFormatException { - // 这还有个base64字符串的解析 - return decodeFromCompressedByteBuffer( - ByteBuffer.wrap(Base64Helper.parseBase64Binary(base64CompressedHistogramString)), - 0); - } - - @Override - public List percentileList(int percentileTicksPerHalfDistance) { - List percentiles = new ArrayList<>(); - for (HistogramIterationValue percentile : this.percentiles(percentileTicksPerHalfDistance)) { - if(percentile.getCountAddedInThisIterationStep() > 0){ - percentiles.add(new Percentile(percentile.getValueIteratedTo(), percentile.getCountAddedInThisIterationStep(), percentile.getPercentile())); - } - } - return percentiles; - } - - @Override - public Histogramer resetHistogram() { - if(isAutoResize()){ - return new ArrayHistogram(this.numberOfSignificantValueDigits); - }else{ - this.reset(); - return this; - } - } - - @Override - public Histogramer merge(Histogramer histogram) { - if(histogram instanceof AbstractHistogram){ - this.add((AbstractHistogram)histogram); - return this; - }else if(histogram instanceof DirectMapHistogram){ - try { - ((DirectMapHistogram)histogram).mergeInto(this); - return this; - } catch (Exception e) { - throw new RuntimeException(e); - } - }else{ - throw new UnsupportedOperationException("unsupported method"); - } - } - - @Override - public byte[] toBytes() { - ByteBuffer byteBuffer = ByteBuffer.allocate(this.getNeededByteBufferCapacity()); - this.encodeIntoByteBuffer(byteBuffer); - return byteBuffer2Bytes(byteBuffer); - } - - public static ArrayHistogram fromBytes(byte[] bytes) { - ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); - return fromByteBuffer(byteBuffer); - } - - public static ArrayHistogram fromByteBuffer(ByteBuffer byteBuffer) { - int initPosition = byteBuffer.position(); - int cookie = byteBuffer.getInt(initPosition); - if(DirectMapHistogram.getCookieBase(cookie) == DirectMapHistogram.V2CompressedEncodingCookieBase){ - try { - return ArrayHistogram.decodeFromCompressedByteBuffer(byteBuffer, 2); - } catch (DataFormatException e) { - throw new RuntimeException(e); - } - }else if(DirectMapHistogram.getCookieBase(cookie) == DirectMapHistogram.V2EncodingCookieBase){ - return ArrayHistogram.decodeFromByteBuffer(byteBuffer, 2); - } - throw new UnsupportedOperationException("unsupported method"); - } -} +package org.HdrHistogram; /** + * Written by Gil Tene of Azul Systems, and released to the public domain, + * as explained at http://creativecommons.org/publicdomain/zero/1.0/ + * + * @author Gil Tene + */ + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.nio.ByteBuffer; +import java.util.*; +import java.util.zip.DataFormatException; + +/** + *

A High Dynamic Range (HDR) Histogram

+ *

+ * {@link ArrayHistogram} supports the recording and analyzing sampled data value counts across a configurable integer value + * range with configurable value precision within the range. Value precision is expressed as the number of significant + * digits in the value recording, and provides control over value quantization behavior across the value range and the + * subsequent value resolution at any given level. + *

+ * For example, a Histogram could be configured to track the counts of observed integer values between 0 and + * 3,600,000,000 while maintaining a value precision of 3 significant digits across that range. Value quantization + * within the range will thus be no larger than 1/1,000th (or 0.1%) of any value. This example Histogram could + * be used to track and analyze the counts of observed response times ranging between 1 microsecond and 1 hour + * in magnitude, while maintaining a value resolution of 1 microsecond up to 1 millisecond, a resolution of + * 1 millisecond (or better) up to one second, and a resolution of 1 second (or better) up to 1,000 seconds. At its + * maximum tracked value (1 hour), it would still maintain a resolution of 3.6 seconds (or better). + *

+ * Histogram tracks value counts in long fields. Smaller field types are available in the + * {@link IntCountsHistogram} and {@link ShortCountsHistogram} implementations of + * {@link AbstractHistogram}. + *

+ * Auto-resizing: When constructed with no specified value range range (or when auto-resize is turned on with {@link + * ArrayHistogram#setAutoResize}) a {@link ArrayHistogram} will auto-resize its dynamic range to include recorded values as + * they are encountered. Note that recording calls that cause auto-resizing may take longer to execute, as resizing + * incurs allocation and copying of internal data structures. + *

+ * See package description for {@link org.HdrHistogram} for details. + */ + +public class ArrayHistogram extends AbstractHistogram implements Histogramer{ + long totalCount; + long[] counts; + int normalizingIndexOffset; + + @Override + long getCountAtIndex(final int index) { + return counts[normalizeIndex(index, normalizingIndexOffset, countsArrayLength)]; + } + + @Override + long getCountAtNormalizedIndex(final int index) { + return counts[index]; + } + + @Override + void incrementCountAtIndex(final int index) { + counts[normalizeIndex(index, normalizingIndexOffset, countsArrayLength)]++; + } + + @Override + void addToCountAtIndex(final int index, final long value) { + // 正常情况下normalizingIndexOffset = 0, index不用偏移 + counts[normalizeIndex(index, normalizingIndexOffset, countsArrayLength)] += value; + } + + @Override + void setCountAtIndex(int index, long value) { + counts[normalizeIndex(index, normalizingIndexOffset, countsArrayLength)] = value; + } + + @Override + void setCountAtNormalizedIndex(int index, long value) { + counts[index] = value; + } + + @Override + int getNormalizingIndexOffset() { + return normalizingIndexOffset; + } + + @Override + void setNormalizingIndexOffset(int normalizingIndexOffset) { + this.normalizingIndexOffset = normalizingIndexOffset; + } + + @Override + void setIntegerToDoubleValueConversionRatio(double integerToDoubleValueConversionRatio) { + nonConcurrentSetIntegerToDoubleValueConversionRatio(integerToDoubleValueConversionRatio); + } + + @Override + void shiftNormalizingIndexByOffset(int offsetToAdd, + boolean lowestHalfBucketPopulated, + double newIntegerToDoubleValueConversionRatio) { + nonConcurrentNormalizingIndexShift(offsetToAdd, lowestHalfBucketPopulated); + } + + @Override + void clearCounts() { + Arrays.fill(counts, 0); + totalCount = 0; + } + + @Override + public Histogramer makeCopy() { + return miniCopy(); + } + + @Override + public ArrayHistogram copy() { + ArrayHistogram copy = new ArrayHistogram(this); + copy.add(this); + return copy; + } + + public ArrayHistogram miniCopy() { + ArrayHistogram copy = new ArrayHistogram(lowestDiscernibleValue, maxValue < highestTrackableValue ? Math.max(maxValue, lowestDiscernibleValue * 2) : highestTrackableValue, numberOfSignificantValueDigits); + copy.add(this); + return copy; + } + + @Override + public ArrayHistogram copyCorrectedForCoordinatedOmission(final long expectedIntervalBetweenValueSamples) { + ArrayHistogram copy = new ArrayHistogram(this); + copy.addWhileCorrectingForCoordinatedOmission(this, expectedIntervalBetweenValueSamples); + return copy; + } + + @Override + public long getTotalCount() { + return totalCount; + } + + @Override + void setTotalCount(final long totalCount) { + this.totalCount = totalCount; + } + + @Override + void incrementTotalCount() { + totalCount++; + } + + @Override + void addToTotalCount(final long value) { + totalCount += value; + } + + @Override + int _getEstimatedFootprintInBytes() { + return (512 + (8 * counts.length)); + } + + @Override + void resize(long newHighestTrackableValue) { + int oldNormalizedZeroIndex = normalizeIndex(0, normalizingIndexOffset, countsArrayLength); + + establishSize(newHighestTrackableValue); + + int countsDelta = countsArrayLength - counts.length; + + counts = Arrays.copyOf(counts, countsArrayLength); + + if (oldNormalizedZeroIndex != 0) { + // We need to shift the stuff from the zero index and up to the end of the array: + int newNormalizedZeroIndex = oldNormalizedZeroIndex + countsDelta; + int lengthToCopy = (countsArrayLength - countsDelta) - oldNormalizedZeroIndex; + System.arraycopy(counts, oldNormalizedZeroIndex, counts, newNormalizedZeroIndex, lengthToCopy); + Arrays.fill(counts, oldNormalizedZeroIndex, newNormalizedZeroIndex, 0); + } + } + + /** + * Construct an auto-resizing histogram with a lowest discernible value of 1 and an auto-adjusting + * highestTrackableValue. Can auto-resize up to track values up to (Long.MAX_VALUE / 2). + * + * @param numberOfSignificantValueDigits Specifies the precision to use. This is the number of significant + * decimal digits to which the histogram will maintain value resolution + * and separation. Must be a non-negative integer between 0 and 5. + */ + public ArrayHistogram(final int numberOfSignificantValueDigits) { + this(1, 2, numberOfSignificantValueDigits); + setAutoResize(true); + } + + /** + * Construct a Histogram given the Highest value to be tracked and a number of significant decimal digits. The + * histogram will be constructed to implicitly track (distinguish from 0) values as low as 1. + * + * @param highestTrackableValue The highest value to be tracked by the histogram. Must be a positive + * integer that is {@literal >=} 2. + * @param numberOfSignificantValueDigits Specifies the precision to use. This is the number of significant + * decimal digits to which the histogram will maintain value resolution + * and separation. Must be a non-negative integer between 0 and 5. + */ + public ArrayHistogram(final long highestTrackableValue, final int numberOfSignificantValueDigits) { + this(1, highestTrackableValue, numberOfSignificantValueDigits); + } + + /** + * Construct a Histogram given the Lowest and Highest values to be tracked and a number of significant + * decimal digits. Providing a lowestDiscernibleValue is useful is situations where the units used + * for the histogram's values are much smaller that the minimal accuracy required. E.g. when tracking + * time values stated in nanosecond units, where the minimal accuracy required is a microsecond, the + * proper value for lowestDiscernibleValue would be 1000. + * + * @param lowestDiscernibleValue The lowest value that can be discerned (distinguished from 0) by the + * histogram. Must be a positive integer that is {@literal >=} 1. May be + * internally rounded down to nearest power of 2. + * @param highestTrackableValue The highest value to be tracked by the histogram. Must be a positive + * integer that is {@literal >=} (2 * lowestDiscernibleValue). + * @param numberOfSignificantValueDigits Specifies the precision to use. This is the number of significant + * decimal digits to which the histogram will maintain value resolution + * and separation. Must be a non-negative integer between 0 and 5. + */ + public ArrayHistogram(final long lowestDiscernibleValue, final long highestTrackableValue, + final int numberOfSignificantValueDigits) { + this(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits, true); + } + + /** + * Construct a histogram with the same range settings as a given source histogram, + * duplicating the source's start/end timestamps (but NOT its contents) + * @param source The source histogram to duplicate + */ + public ArrayHistogram(final AbstractHistogram source) { + this(source, true); + } + + ArrayHistogram(final AbstractHistogram source, boolean allocateCountsArray) { + super(source); + if (allocateCountsArray) { + counts = new long[countsArrayLength]; + } + wordSizeInBytes = 8; + } + + ArrayHistogram(final long lowestDiscernibleValue, final long highestTrackableValue, + final int numberOfSignificantValueDigits, boolean allocateCountsArray) { + super(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits); + if (allocateCountsArray) { + counts = new long[countsArrayLength]; + } + // 写死 = 8 + wordSizeInBytes = 8; + } + + /** + * Construct a new histogram by decoding it from a ByteBuffer. + * @param buffer The buffer to decode from + * @param minBarForHighestTrackableValue Force highestTrackableValue to be set at least this high + * @return The newly constructed histogram + */ + public static ArrayHistogram decodeFromByteBuffer(final ByteBuffer buffer, + final long minBarForHighestTrackableValue) { + return decodeFromByteBuffer(buffer, ArrayHistogram.class, minBarForHighestTrackableValue); + } + + /** + * Construct a new histogram by decoding it from a compressed form in a ByteBuffer. + * @param buffer The buffer to decode from + * @param minBarForHighestTrackableValue Force highestTrackableValue to be set at least this high + * @return The newly constructed histogram + * @throws DataFormatException on error parsing/decompressing the buffer + */ + public static ArrayHistogram decodeFromCompressedByteBuffer(final ByteBuffer buffer, + final long minBarForHighestTrackableValue) + throws DataFormatException { + return decodeFromCompressedByteBuffer(buffer, ArrayHistogram.class, minBarForHighestTrackableValue); + } + + private void readObject(final ObjectInputStream o) + throws IOException, ClassNotFoundException { + o.defaultReadObject(); + } + + /** + * Construct a new Histogram by decoding it from a String containing a base64 encoded + * compressed histogram representation. + * + * @param base64CompressedHistogramString A string containing a base64 encoding of a compressed histogram + * @return A Histogream decoded from the string + * @throws DataFormatException on error parsing/decompressing the input + */ + public static ArrayHistogram fromString(final String base64CompressedHistogramString) + throws DataFormatException { + // 这还有个base64字符串的解析 + return decodeFromCompressedByteBuffer( + ByteBuffer.wrap(Base64Helper.parseBase64Binary(base64CompressedHistogramString)), + 0); + } + + @Override + public List percentileList(int percentileTicksPerHalfDistance) { + List percentiles = new ArrayList<>(); + for (HistogramIterationValue percentile : this.percentiles(percentileTicksPerHalfDistance)) { + if(percentile.getCountAddedInThisIterationStep() > 0){ + percentiles.add(new Percentile(percentile.getValueIteratedTo(), percentile.getCountAddedInThisIterationStep(), percentile.getPercentile())); + } + } + return percentiles; + } + + @Override + public Map describe() { + long min = getMinValue(); + long max = getMaxValue(); // max = this.maxValue; + long count = getTotalCount(); + double mean = getMean(); + long sum = (long) (mean * count); + mean = Math.round(mean * 100.0) / 100.0; + long p25 = getValueAtPercentile(25); + long p50 = getValueAtPercentile(50); + long p75 = getValueAtPercentile(75); + long p90 = getValueAtPercentile(90); + long p95 = getValueAtPercentile(95); + long p99 = getValueAtPercentile(99); + Map rst = new LinkedHashMap<>(); + rst.put("count", count); + rst.put("mean", mean); + rst.put("sum", sum); + rst.put("min", min); + rst.put("p25", p25); + rst.put("p50", p50); + rst.put("p75", p75); + rst.put("p90", p90); + rst.put("p95", p95); + rst.put("p99", p99); + rst.put("max", max); + return rst; + } + + @Override + public Histogramer resetHistogram() { + if(isAutoResize()){ + return new ArrayHistogram(this.numberOfSignificantValueDigits); + }else{ + this.reset(); + return this; + } + } + + @Override + public Histogramer merge(Histogramer histogram) { + if(histogram instanceof AbstractHistogram){ + this.add((AbstractHistogram)histogram); + return this; + }else if(histogram instanceof DirectMapHistogram){ + try { + ((DirectMapHistogram)histogram).mergeInto(this); + return this; + } catch (Exception e) { + throw new RuntimeException(e); + } + }else{ + throw new UnsupportedOperationException("unsupported method"); + } + } + + @Override + public byte[] toBytes() { + ByteBuffer byteBuffer = ByteBuffer.allocate(this.getNeededByteBufferCapacity()); + this.encodeIntoByteBuffer(byteBuffer); + return byteBuffer2Bytes(byteBuffer); + } + + public static ArrayHistogram fromBytes(byte[] bytes) { + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + return fromByteBuffer(byteBuffer); + } + + public static ArrayHistogram fromByteBuffer(ByteBuffer byteBuffer) { + int initPosition = byteBuffer.position(); + int cookie = byteBuffer.getInt(initPosition); + if(DirectMapHistogram.getCookieBase(cookie) == DirectMapHistogram.V2CompressedEncodingCookieBase){ + try { + return ArrayHistogram.decodeFromCompressedByteBuffer(byteBuffer, 2); + } catch (DataFormatException e) { + throw new RuntimeException(e); + } + }else if(DirectMapHistogram.getCookieBase(cookie) == DirectMapHistogram.V2EncodingCookieBase){ + return ArrayHistogram.decodeFromByteBuffer(byteBuffer, 2); + } + throw new UnsupportedOperationException("unsupported method"); + } +} diff --git a/druid-hdrhistogram/src/main/java/org/HdrHistogram/DirectArrayHistogram.java b/druid-hdrhistogram/src/main/java/org/HdrHistogram/DirectArrayHistogram.java index 6ab99ab..0b2636f 100644 --- a/druid-hdrhistogram/src/main/java/org/HdrHistogram/DirectArrayHistogram.java +++ b/druid-hdrhistogram/src/main/java/org/HdrHistogram/DirectArrayHistogram.java @@ -1,203 +1,234 @@ -package org.HdrHistogram; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; - -public class DirectArrayHistogram extends AbstractHistogram implements Histogramer{ - long totalCount; - int normalizingIndexOffset; - private ByteBuffer byteBuffer; - private int initPosition; - - public DirectArrayHistogram(final long lowestDiscernibleValue, final long highestTrackableValue, - final int numberOfSignificantValueDigits, ByteBuffer byteBuffer) { - super(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits); - this.byteBuffer = byteBuffer; - this.initPosition = byteBuffer.position(); - wordSizeInBytes = 8; - } - - // druid内部使用 - public void resetByteBuffer(ByteBuffer byteBuffer){ - this.byteBuffer = byteBuffer; - this.initPosition = byteBuffer.position(); - } - - @Override - long getCountAtIndex(int index) { - int i = normalizeIndex(index, normalizingIndexOffset, countsArrayLength); - return byteBuffer.getLong(initPosition + i * 8); - } - - @Override - long getCountAtNormalizedIndex(int index) { - return byteBuffer.getLong(initPosition + index * 8); - } - - @Override - void incrementCountAtIndex(int index) { - int i = normalizeIndex(index, normalizingIndexOffset, countsArrayLength); - int pos = initPosition + i * 8; - long val = byteBuffer.getLong(pos); - byteBuffer.putLong(pos, val + 1); - } - - @Override - void addToCountAtIndex(int index, long value) { - int i = normalizeIndex(index, normalizingIndexOffset, countsArrayLength); - int pos = initPosition + i * 8; - long val = byteBuffer.getLong(pos); - byteBuffer.putLong(pos, val + value); - } - - @Override - void setCountAtIndex(int index, long value) { - int i = normalizeIndex(index, normalizingIndexOffset, countsArrayLength); - int pos = initPosition + i * 8; - byteBuffer.putLong(pos, value); - } - - @Override - void setCountAtNormalizedIndex(int index, long value) { - int pos = initPosition + index * 8; - byteBuffer.putLong(pos, value); - } - - @Override - int getNormalizingIndexOffset() { - return normalizingIndexOffset; - } - - @Override - void setNormalizingIndexOffset(int normalizingIndexOffset) { - if(normalizingIndexOffset == 0){ - this.normalizingIndexOffset = normalizingIndexOffset; - }else{ - throw new RuntimeException("cant not setNormalizingIndexOffset"); - } - } - - @Override - void setIntegerToDoubleValueConversionRatio(double integerToDoubleValueConversionRatio) { - nonConcurrentSetIntegerToDoubleValueConversionRatio(integerToDoubleValueConversionRatio); - } - - @Override - void shiftNormalizingIndexByOffset(int offsetToAdd, boolean lowestHalfBucketPopulated, double newIntegerToDoubleValueConversionRatio) { - nonConcurrentNormalizingIndexShift(offsetToAdd, lowestHalfBucketPopulated); - } - - @Override - void clearCounts() { - for (int i = 0; i < countsArrayLength; i++) { - byteBuffer.putLong(initPosition + i * 8, 0L); - } - totalCount = 0; - } - - @Override - public Histogramer makeCopy() { - return miniCopy(); - } - - @Override - public ArrayHistogram copy() { - ArrayHistogram copy = new ArrayHistogram(this); - copy.add(this); - return copy; - } - - public ArrayHistogram miniCopy() { - ArrayHistogram copy = new ArrayHistogram(lowestDiscernibleValue, maxValue < highestTrackableValue ? Math.max(maxValue, lowestDiscernibleValue * 2) : highestTrackableValue, numberOfSignificantValueDigits); - copy.add(this); - return copy; - } - - @Override - public AbstractHistogram copyCorrectedForCoordinatedOmission(long expectedIntervalBetweenValueSamples) { - Histogram copy = new Histogram(this); - copy.addWhileCorrectingForCoordinatedOmission(this, expectedIntervalBetweenValueSamples); - return copy; - } - - @Override - public long getTotalCount() { - return totalCount; - } - - @Override - void setTotalCount(final long totalCount) { - this.totalCount = totalCount; - } - - @Override - void incrementTotalCount() { - totalCount++; - } - - @Override - void addToTotalCount(long value) { - totalCount += value; - } - - - @Override - int _getEstimatedFootprintInBytes() { - return (512 + (8 * countsArrayLength)); - } - - @Override - void resize(long newHighestTrackableValue) { - throw new RuntimeException("cant not resize"); - } - - public static int getCountsArrayLength(long lowestDiscernibleValue, long highestTrackableValue, int numberOfSignificantValueDigits){ - Histogram his = new Histogram(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits, false); - return his.countsArrayLength; - } - - public static final int getUpdatableSerializationBytes(long lowestDiscernibleValue, long highestTrackableValue, int numberOfSignificantValueDigits){ - return getCountsArrayLength(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits) * 8; - } - - @Override - public List percentileList(int percentileTicksPerHalfDistance) { - List percentiles = new ArrayList<>(); - for (HistogramIterationValue percentile : this.percentiles(percentileTicksPerHalfDistance)) { - if(percentile.getCountAddedInThisIterationStep() > 0){ - percentiles.add(new Percentile(percentile.getValueIteratedTo(), percentile.getCountAddedInThisIterationStep(), percentile.getPercentile())); - } - } - return percentiles; - } - - @Override - public Histogramer resetHistogram() { - throw new UnsupportedOperationException("unsupported method"); - } - - @Override - public Histogramer merge(Histogramer histogram) { - if(histogram instanceof AbstractHistogram){ - this.add((AbstractHistogram)histogram); - return this; - }else if(histogram instanceof DirectMapHistogram){ - try { - ((DirectMapHistogram)histogram).mergeInto(this); - return this; - } catch (Exception e) { - throw new RuntimeException(e); - } - }else{ - throw new UnsupportedOperationException("unsupported method"); - } - } - - @Override - public byte[] toBytes() { - ByteBuffer byteBuffer = ByteBuffer.allocate(this.getNeededByteBufferCapacity()); - this.encodeIntoByteBuffer(byteBuffer); - return byteBuffer2Bytes(byteBuffer); - } -} +package org.HdrHistogram; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +public class DirectArrayHistogram extends AbstractHistogram implements Histogramer{ + long totalCount; + int normalizingIndexOffset; + private ByteBuffer byteBuffer; + private int initPosition; + + public DirectArrayHistogram(final long lowestDiscernibleValue, final long highestTrackableValue, + final int numberOfSignificantValueDigits, ByteBuffer byteBuffer) { + super(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits); + this.byteBuffer = byteBuffer; + this.initPosition = byteBuffer.position(); + wordSizeInBytes = 8; + } + + // druid内部使用 + public void resetByteBuffer(ByteBuffer byteBuffer){ + this.byteBuffer = byteBuffer; + this.initPosition = byteBuffer.position(); + } + + @Override + long getCountAtIndex(int index) { + int i = normalizeIndex(index, normalizingIndexOffset, countsArrayLength); + return byteBuffer.getLong(initPosition + i * 8); + } + + @Override + long getCountAtNormalizedIndex(int index) { + return byteBuffer.getLong(initPosition + index * 8); + } + + @Override + void incrementCountAtIndex(int index) { + int i = normalizeIndex(index, normalizingIndexOffset, countsArrayLength); + int pos = initPosition + i * 8; + long val = byteBuffer.getLong(pos); + byteBuffer.putLong(pos, val + 1); + } + + @Override + void addToCountAtIndex(int index, long value) { + int i = normalizeIndex(index, normalizingIndexOffset, countsArrayLength); + int pos = initPosition + i * 8; + long val = byteBuffer.getLong(pos); + byteBuffer.putLong(pos, val + value); + } + + @Override + void setCountAtIndex(int index, long value) { + int i = normalizeIndex(index, normalizingIndexOffset, countsArrayLength); + int pos = initPosition + i * 8; + byteBuffer.putLong(pos, value); + } + + @Override + void setCountAtNormalizedIndex(int index, long value) { + int pos = initPosition + index * 8; + byteBuffer.putLong(pos, value); + } + + @Override + int getNormalizingIndexOffset() { + return normalizingIndexOffset; + } + + @Override + void setNormalizingIndexOffset(int normalizingIndexOffset) { + if(normalizingIndexOffset == 0){ + this.normalizingIndexOffset = normalizingIndexOffset; + }else{ + throw new RuntimeException("cant not setNormalizingIndexOffset"); + } + } + + @Override + void setIntegerToDoubleValueConversionRatio(double integerToDoubleValueConversionRatio) { + nonConcurrentSetIntegerToDoubleValueConversionRatio(integerToDoubleValueConversionRatio); + } + + @Override + void shiftNormalizingIndexByOffset(int offsetToAdd, boolean lowestHalfBucketPopulated, double newIntegerToDoubleValueConversionRatio) { + nonConcurrentNormalizingIndexShift(offsetToAdd, lowestHalfBucketPopulated); + } + + @Override + void clearCounts() { + for (int i = 0; i < countsArrayLength; i++) { + byteBuffer.putLong(initPosition + i * 8, 0L); + } + totalCount = 0; + } + + @Override + public Histogramer makeCopy() { + return miniCopy(); + } + + @Override + public ArrayHistogram copy() { + ArrayHistogram copy = new ArrayHistogram(this); + copy.add(this); + return copy; + } + + public ArrayHistogram miniCopy() { + ArrayHistogram copy = new ArrayHistogram(lowestDiscernibleValue, maxValue < highestTrackableValue ? Math.max(maxValue, lowestDiscernibleValue * 2) : highestTrackableValue, numberOfSignificantValueDigits); + copy.add(this); + return copy; + } + + @Override + public AbstractHistogram copyCorrectedForCoordinatedOmission(long expectedIntervalBetweenValueSamples) { + Histogram copy = new Histogram(this); + copy.addWhileCorrectingForCoordinatedOmission(this, expectedIntervalBetweenValueSamples); + return copy; + } + + @Override + public long getTotalCount() { + return totalCount; + } + + @Override + void setTotalCount(final long totalCount) { + this.totalCount = totalCount; + } + + @Override + void incrementTotalCount() { + totalCount++; + } + + @Override + void addToTotalCount(long value) { + totalCount += value; + } + + + @Override + int _getEstimatedFootprintInBytes() { + return (512 + (8 * countsArrayLength)); + } + + @Override + void resize(long newHighestTrackableValue) { + throw new RuntimeException("cant not resize"); + } + + public static int getCountsArrayLength(long lowestDiscernibleValue, long highestTrackableValue, int numberOfSignificantValueDigits){ + Histogram his = new Histogram(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits, false); + return his.countsArrayLength; + } + + public static final int getUpdatableSerializationBytes(long lowestDiscernibleValue, long highestTrackableValue, int numberOfSignificantValueDigits){ + return getCountsArrayLength(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits) * 8; + } + + @Override + public List percentileList(int percentileTicksPerHalfDistance) { + List percentiles = new ArrayList<>(); + for (HistogramIterationValue percentile : this.percentiles(percentileTicksPerHalfDistance)) { + if(percentile.getCountAddedInThisIterationStep() > 0){ + percentiles.add(new Percentile(percentile.getValueIteratedTo(), percentile.getCountAddedInThisIterationStep(), percentile.getPercentile())); + } + } + return percentiles; + } + + @Override + public Map describe() { + long min = getMinValue(); + long max = getMaxValue(); // max = this.maxValue; + long count = getTotalCount(); + double mean = getMean(); + long sum = (long) (mean * count); + mean = Math.round(mean * 100.0) / 100.0; + long p25 = getValueAtPercentile(25); + long p50 = getValueAtPercentile(50); + long p75 = getValueAtPercentile(75); + long p90 = getValueAtPercentile(90); + long p95 = getValueAtPercentile(95); + long p99 = getValueAtPercentile(99); + Map rst = new LinkedHashMap<>(); + rst.put("count", count); + rst.put("mean", mean); + rst.put("sum", sum); + rst.put("min", min); + rst.put("p25", p25); + rst.put("p50", p50); + rst.put("p75", p75); + rst.put("p90", p90); + rst.put("p95", p95); + rst.put("p99", p99); + rst.put("max", max); + return rst; + } + + @Override + public Histogramer resetHistogram() { + throw new UnsupportedOperationException("unsupported method"); + } + + @Override + public Histogramer merge(Histogramer histogram) { + if(histogram instanceof AbstractHistogram){ + this.add((AbstractHistogram)histogram); + return this; + }else if(histogram instanceof DirectMapHistogram){ + try { + ((DirectMapHistogram)histogram).mergeInto(this); + return this; + } catch (Exception e) { + throw new RuntimeException(e); + } + }else{ + throw new UnsupportedOperationException("unsupported method"); + } + } + + @Override + public byte[] toBytes() { + ByteBuffer byteBuffer = ByteBuffer.allocate(this.getNeededByteBufferCapacity()); + this.encodeIntoByteBuffer(byteBuffer); + return byteBuffer2Bytes(byteBuffer); + } +} diff --git a/druid-hdrhistogram/src/main/java/org/HdrHistogram/DirectMapHistogram.java b/druid-hdrhistogram/src/main/java/org/HdrHistogram/DirectMapHistogram.java index a35e4cd..bc0951d 100644 --- a/druid-hdrhistogram/src/main/java/org/HdrHistogram/DirectMapHistogram.java +++ b/druid-hdrhistogram/src/main/java/org/HdrHistogram/DirectMapHistogram.java @@ -1,486 +1,492 @@ -package org.HdrHistogram; - -import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.util.List; -import java.util.zip.DataFormatException; -import java.util.zip.Inflater; - -import static java.nio.ByteOrder.BIG_ENDIAN; - -/** - * 直接映射字节数组到Histogram,只读的Histogram,用于druid查询,减少gc减少计算,序列化后的是稀疏数组的形式 - */ -public class DirectMapHistogram implements Histogramer{ - static final int V2maxWordSizeInBytes = 9; // LEB128-64b9B + ZigZag require up to 9 bytes per word - static final int V2EncodingCookieBase = 0x1c849303; - static final int V2CompressedEncodingCookieBase = 0x1c849304; - - final ByteBuffer byteBuffer; - final int initPosition; - long totalCount; - - private DirectMapHistogram(ByteBuffer byteBuffer) { - int initPosition = byteBuffer.position(); - this.byteBuffer = byteBuffer; - this.initPosition = initPosition; - this.totalCount = -1; - } - - public static boolean byteBufferCanToDirectMapHistogram(ByteBuffer byteBuffer) { - int initPosition = byteBuffer.position(); - int cookie = byteBuffer.getInt(initPosition); - return getCookieBase(cookie) == V2EncodingCookieBase || getCookieBase(cookie) == V2CompressedEncodingCookieBase; - } - - public static DirectMapHistogram wrapBytes(byte[] bytes) { - ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); - return wrapByteBuffer(byteBuffer); - } - - public static DirectMapHistogram wrapByteBuffer(ByteBuffer byteBuffer) { - if(byteBufferCanToDirectMapHistogram(byteBuffer)){ - DirectMapHistogram hll = new DirectMapHistogram(byteBuffer); - return hll; - } - throw new RuntimeException("can not wrapByteBuffer"); - } - - public void mergeInto(AbstractHistogram histogram) throws Exception{ - int cookie = byteBuffer.getInt(initPosition); - if(getCookieBase(cookie) == V2CompressedEncodingCookieBase){ - final int lengthOfCompressedContents = byteBuffer.getInt(initPosition + 4); - final Inflater decompressor = new Inflater(); - - if (byteBuffer.hasArray()) { - decompressor.setInput(byteBuffer.array(), initPosition + 8, lengthOfCompressedContents); - } else { - byte[] compressedContents = new byte[lengthOfCompressedContents]; - byteBuffer.position(initPosition + 8); - try { - byteBuffer.get(compressedContents); - decompressor.setInput(compressedContents); - }finally { - byteBuffer.position(initPosition); - } - } - final int headerSize = 40; - final ByteBuffer headerBuffer = ByteBuffer.allocate(headerSize).order(BIG_ENDIAN); - decompressor.inflate(headerBuffer.array()); - - cookie = headerBuffer.getInt(); - final int payloadLengthInBytes; - final int normalizingIndexOffset; - final int numberOfSignificantValueDigits; - final long lowestTrackableUnitValue; - long highestTrackableValue; - final double integerToDoubleValueConversionRatio; - - assert getCookieBase(cookie) == V2EncodingCookieBase; - - payloadLengthInBytes = headerBuffer.getInt(4); - normalizingIndexOffset = headerBuffer.getInt(8); - numberOfSignificantValueDigits = headerBuffer.getInt( 12); - lowestTrackableUnitValue = headerBuffer.getLong(16); - highestTrackableValue = headerBuffer.getLong(24); - integerToDoubleValueConversionRatio = headerBuffer.getDouble(32); - - highestTrackableValue = Math.max(highestTrackableValue, 2); - - final long largestValueWithSingleUnitResolution = 2 * (long) Math.pow(10, numberOfSignificantValueDigits); - final int unitMagnitude = (int) (Math.log(lowestTrackableUnitValue)/Math.log(2)); - final long unitMagnitudeMask = (1 << unitMagnitude) - 1; - int subBucketCountMagnitude = (int) Math.ceil(Math.log(largestValueWithSingleUnitResolution)/Math.log(2)); - final int subBucketHalfCountMagnitude = subBucketCountMagnitude - 1; - final int subBucketCount = 1 << subBucketCountMagnitude; - final int subBucketHalfCount = subBucketCount / 2; - final long subBucketMask = ((long)subBucketCount - 1) << unitMagnitude; - if (subBucketCountMagnitude + unitMagnitude > 62) { - // subBucketCount entries can't be represented, with unitMagnitude applied, in a positive long. - // Technically it still sort of works if their sum is 63: you can represent all but the last number - // in the shifted subBucketCount. However, the utility of such a histogram vs ones whose magnitude here - // fits in 62 bits is debatable, and it makes it harder to work through the logic. - // Sums larger than 64 are totally broken as leadingZeroCountBase would go negative. - throw new IllegalArgumentException("Cannot represent numberOfSignificantValueDigits worth of values " + - "beyond lowestDiscernibleValue"); - } - - final int expectedCapacity = payloadLengthInBytes; - - ByteBuffer sourceBuffer = ByteBuffer.allocate(expectedCapacity).order(BIG_ENDIAN); - int decompressedByteCount = decompressor.inflate(sourceBuffer.array()); - decompressor.end(); // 必须手动调用,否则快速调用可能内存溢出(堆外内存) - if ((payloadLengthInBytes != Integer.MAX_VALUE) && (decompressedByteCount < payloadLengthInBytes)) { - throw new IllegalArgumentException("The buffer does not contain the indicated payload amount"); - } - assert decompressedByteCount == expectedCapacity; - - int dstIndex = 0; - int endPosition = sourceBuffer.position() + expectedCapacity; //期望的结束读取的索引 - while (sourceBuffer.position() < endPosition) { - long count; - int zerosCount = 0; - // V2 encoding format uses a long encoded in a ZigZag LEB128 format (up to V2maxWordSizeInBytes): - count = ZigZagEncoding.getLong(sourceBuffer); - if (count < 0) { - long zc = -count; // 0值的连续个数 - if (zc > Integer.MAX_VALUE) { - throw new IllegalArgumentException( - "An encoded zero count of > Integer.MAX_VALUE was encountered in the source"); - } - zerosCount = (int) zc; - } - if (zerosCount > 0) { - dstIndex += zerosCount; // No need to set zeros in array. Just skip them. - } else { - // 单个非连续的0也会被输出 - if(count > 0){ - long value = valueFromIndex(dstIndex, subBucketHalfCountMagnitude, subBucketHalfCount, unitMagnitude); - histogram.recordValueWithCount(value, count); - } - dstIndex++; - } - } - - }else if(getCookieBase(cookie) == V2EncodingCookieBase){ - final int payloadLengthInBytes; - final int normalizingIndexOffset; - final int numberOfSignificantValueDigits; - final long lowestTrackableUnitValue; - long highestTrackableValue; - final double integerToDoubleValueConversionRatio; - - payloadLengthInBytes = byteBuffer.getInt(initPosition + 4); - normalizingIndexOffset = byteBuffer.getInt(initPosition + 8); - numberOfSignificantValueDigits = byteBuffer.getInt(initPosition + 12); - lowestTrackableUnitValue = byteBuffer.getLong(initPosition + 16); - highestTrackableValue = byteBuffer.getLong(initPosition + 24); - integerToDoubleValueConversionRatio = byteBuffer.getDouble(initPosition + 32); - - highestTrackableValue = Math.max(highestTrackableValue, 2); - - final long largestValueWithSingleUnitResolution = 2 * (long) Math.pow(10, numberOfSignificantValueDigits); - final int unitMagnitude = (int) (Math.log(lowestTrackableUnitValue)/Math.log(2)); - final long unitMagnitudeMask = (1 << unitMagnitude) - 1; - int subBucketCountMagnitude = (int) Math.ceil(Math.log(largestValueWithSingleUnitResolution)/Math.log(2)); - final int subBucketHalfCountMagnitude = subBucketCountMagnitude - 1; - final int subBucketCount = 1 << subBucketCountMagnitude; - final int subBucketHalfCount = subBucketCount / 2; - final long subBucketMask = ((long)subBucketCount - 1) << unitMagnitude; - if (subBucketCountMagnitude + unitMagnitude > 62) { - // subBucketCount entries can't be represented, with unitMagnitude applied, in a positive long. - // Technically it still sort of works if their sum is 63: you can represent all but the last number - // in the shifted subBucketCount. However, the utility of such a histogram vs ones whose magnitude here - // fits in 62 bits is debatable, and it makes it harder to work through the logic. - // Sums larger than 64 are totally broken as leadingZeroCountBase would go negative. - throw new IllegalArgumentException("Cannot represent numberOfSignificantValueDigits worth of values " + - "beyond lowestDiscernibleValue"); - } - - final int expectedCapacity =payloadLengthInBytes; - assert expectedCapacity == payloadLengthInBytes; - if(expectedCapacity > byteBuffer.limit() - 40){ - throw new IllegalArgumentException("The buffer does not contain the full Histogram payload"); - } - final int position = initPosition + 40; - final int lengthInBytes = expectedCapacity; - final int wordSizeInBytes = V2maxWordSizeInBytes; - // fillCountsArrayFromSourceBuffer - - ByteBuffer sourceBuffer = byteBuffer.duplicate(); - sourceBuffer.position(position); - final long maxAllowableCountInHistigram = Long.MAX_VALUE; - int dstIndex = 0; - int endPosition = sourceBuffer.position() + lengthInBytes; //期望的结束读取的索引 - while (sourceBuffer.position() < endPosition) { - long count; - int zerosCount = 0; - // V2 encoding format uses a long encoded in a ZigZag LEB128 format (up to V2maxWordSizeInBytes): - count = ZigZagEncoding.getLong(sourceBuffer); - if (count < 0) { - long zc = -count; // 0值的连续个数 - if (zc > Integer.MAX_VALUE) { - throw new IllegalArgumentException( - "An encoded zero count of > Integer.MAX_VALUE was encountered in the source"); - } - zerosCount = (int) zc; - } - if (zerosCount > 0) { - dstIndex += zerosCount; // No need to set zeros in array. Just skip them. - } else { - // 单个非连续的0也会被输出 - if(count > 0){ - long value = valueFromIndex(dstIndex, subBucketHalfCountMagnitude, subBucketHalfCount, unitMagnitude); - histogram.recordValueWithCount(value, count); - } - dstIndex++; - } - } - }else{ - throw new RuntimeException("can not wrapByteBuffer"); - } - } - - final long valueFromIndex(final int index, int subBucketHalfCountMagnitude, int subBucketHalfCount, int unitMagnitude) { - int bucketIndex = (index >> subBucketHalfCountMagnitude) - 1; - int subBucketIndex = (index & (subBucketHalfCount - 1)) + subBucketHalfCount; - if (bucketIndex < 0) { - subBucketIndex -= subBucketHalfCount; - bucketIndex = 0; - } - return valueFromIndex(bucketIndex, subBucketIndex, unitMagnitude); - } - - private long valueFromIndex(final int bucketIndex, final int subBucketIndex, int unitMagnitude) { - return ((long) subBucketIndex) << (bucketIndex + unitMagnitude); - } - - static int getCookieBase(final int cookie) { - return (cookie & ~0xf0); - } - - @Override - public long getTotalCount() { - if(totalCount >= 0){ - return totalCount; - } - try { - totalCount = 0; - int cookie = byteBuffer.getInt(initPosition); - if(getCookieBase(cookie) == V2CompressedEncodingCookieBase){ - final int lengthOfCompressedContents = byteBuffer.getInt(initPosition + 4); - final Inflater decompressor = new Inflater(); - - if (byteBuffer.hasArray()) { - decompressor.setInput(byteBuffer.array(), initPosition + 8, lengthOfCompressedContents); - } else { - byte[] compressedContents = new byte[lengthOfCompressedContents]; - byteBuffer.position(initPosition + 8); - try { - byteBuffer.get(compressedContents); - decompressor.setInput(compressedContents); - }finally { - byteBuffer.position(initPosition); - } - } - final int headerSize = 40; - final ByteBuffer headerBuffer = ByteBuffer.allocate(headerSize).order(BIG_ENDIAN); - decompressor.inflate(headerBuffer.array()); - - cookie = headerBuffer.getInt(); - final int payloadLengthInBytes; - final int normalizingIndexOffset; - final int numberOfSignificantValueDigits; - final long lowestTrackableUnitValue; - long highestTrackableValue; - final double integerToDoubleValueConversionRatio; - - assert getCookieBase(cookie) == V2EncodingCookieBase; - - payloadLengthInBytes = headerBuffer.getInt(4); - normalizingIndexOffset = headerBuffer.getInt(8); - numberOfSignificantValueDigits = headerBuffer.getInt( 12); - lowestTrackableUnitValue = headerBuffer.getLong(16); - highestTrackableValue = headerBuffer.getLong(24); - integerToDoubleValueConversionRatio = headerBuffer.getDouble(32); - - highestTrackableValue = Math.max(highestTrackableValue, 2); - - final long largestValueWithSingleUnitResolution = 2 * (long) Math.pow(10, numberOfSignificantValueDigits); - final int unitMagnitude = (int) (Math.log(lowestTrackableUnitValue)/Math.log(2)); - final long unitMagnitudeMask = (1 << unitMagnitude) - 1; - int subBucketCountMagnitude = (int) Math.ceil(Math.log(largestValueWithSingleUnitResolution)/Math.log(2)); - final int subBucketHalfCountMagnitude = subBucketCountMagnitude - 1; - final int subBucketCount = 1 << subBucketCountMagnitude; - final int subBucketHalfCount = subBucketCount / 2; - final long subBucketMask = ((long)subBucketCount - 1) << unitMagnitude; - if (subBucketCountMagnitude + unitMagnitude > 62) { - // subBucketCount entries can't be represented, with unitMagnitude applied, in a positive long. - // Technically it still sort of works if their sum is 63: you can represent all but the last number - // in the shifted subBucketCount. However, the utility of such a histogram vs ones whose magnitude here - // fits in 62 bits is debatable, and it makes it harder to work through the logic. - // Sums larger than 64 are totally broken as leadingZeroCountBase would go negative. - throw new IllegalArgumentException("Cannot represent numberOfSignificantValueDigits worth of values " + - "beyond lowestDiscernibleValue"); - } - - final int expectedCapacity = payloadLengthInBytes; - - ByteBuffer sourceBuffer = ByteBuffer.allocate(expectedCapacity).order(BIG_ENDIAN); - int decompressedByteCount = decompressor.inflate(sourceBuffer.array()); - decompressor.end(); // 必须手动调用,否则快速调用可能内存溢出(堆外内存) - if ((payloadLengthInBytes != Integer.MAX_VALUE) && (decompressedByteCount < payloadLengthInBytes)) { - throw new IllegalArgumentException("The buffer does not contain the indicated payload amount"); - } - assert decompressedByteCount == expectedCapacity; - - int dstIndex = 0; - int endPosition = sourceBuffer.position() + expectedCapacity; //期望的结束读取的索引 - while (sourceBuffer.position() < endPosition) { - long count; - int zerosCount = 0; - // V2 encoding format uses a long encoded in a ZigZag LEB128 format (up to V2maxWordSizeInBytes): - count = ZigZagEncoding.getLong(sourceBuffer); - if (count < 0) { - long zc = -count; // 0值的连续个数 - if (zc > Integer.MAX_VALUE) { - throw new IllegalArgumentException( - "An encoded zero count of > Integer.MAX_VALUE was encountered in the source"); - } - zerosCount = (int) zc; - } - if (zerosCount > 0) { - dstIndex += zerosCount; // No need to set zeros in array. Just skip them. - } else { - // 单个非连续的0也会被输出 - if(count > 0){ - //long value = valueFromIndex(dstIndex, subBucketHalfCountMagnitude, subBucketHalfCount, unitMagnitude); - //histogram.recordValueWithCount(value, count); - totalCount += count; - } - dstIndex++; - } - } - return totalCount; - }else if(getCookieBase(cookie) == V2EncodingCookieBase){ - final int payloadLengthInBytes; - final int normalizingIndexOffset; - final int numberOfSignificantValueDigits; - final long lowestTrackableUnitValue; - long highestTrackableValue; - final double integerToDoubleValueConversionRatio; - - payloadLengthInBytes = byteBuffer.getInt(initPosition + 4); - normalizingIndexOffset = byteBuffer.getInt(initPosition + 8); - numberOfSignificantValueDigits = byteBuffer.getInt(initPosition + 12); - lowestTrackableUnitValue = byteBuffer.getLong(initPosition + 16); - highestTrackableValue = byteBuffer.getLong(initPosition + 24); - integerToDoubleValueConversionRatio = byteBuffer.getDouble(initPosition + 32); - - highestTrackableValue = Math.max(highestTrackableValue, 2); - - final long largestValueWithSingleUnitResolution = 2 * (long) Math.pow(10, numberOfSignificantValueDigits); - final int unitMagnitude = (int) (Math.log(lowestTrackableUnitValue)/Math.log(2)); - final long unitMagnitudeMask = (1 << unitMagnitude) - 1; - int subBucketCountMagnitude = (int) Math.ceil(Math.log(largestValueWithSingleUnitResolution)/Math.log(2)); - final int subBucketHalfCountMagnitude = subBucketCountMagnitude - 1; - final int subBucketCount = 1 << subBucketCountMagnitude; - final int subBucketHalfCount = subBucketCount / 2; - final long subBucketMask = ((long)subBucketCount - 1) << unitMagnitude; - if (subBucketCountMagnitude + unitMagnitude > 62) { - // subBucketCount entries can't be represented, with unitMagnitude applied, in a positive long. - // Technically it still sort of works if their sum is 63: you can represent all but the last number - // in the shifted subBucketCount. However, the utility of such a histogram vs ones whose magnitude here - // fits in 62 bits is debatable, and it makes it harder to work through the logic. - // Sums larger than 64 are totally broken as leadingZeroCountBase would go negative. - throw new IllegalArgumentException("Cannot represent numberOfSignificantValueDigits worth of values " + - "beyond lowestDiscernibleValue"); - } - - final int expectedCapacity =payloadLengthInBytes; - assert expectedCapacity == payloadLengthInBytes; - if(expectedCapacity > byteBuffer.limit() - 40){ - throw new IllegalArgumentException("The buffer does not contain the full Histogram payload"); - } - final int position = initPosition + 40; - final int lengthInBytes = expectedCapacity; - final int wordSizeInBytes = V2maxWordSizeInBytes; - // fillCountsArrayFromSourceBuffer - - ByteBuffer sourceBuffer = byteBuffer.duplicate(); - sourceBuffer.position(position); - final long maxAllowableCountInHistigram = Long.MAX_VALUE; - int dstIndex = 0; - int endPosition = sourceBuffer.position() + lengthInBytes; //期望的结束读取的索引 - while (sourceBuffer.position() < endPosition) { - long count; - int zerosCount = 0; - // V2 encoding format uses a long encoded in a ZigZag LEB128 format (up to V2maxWordSizeInBytes): - count = ZigZagEncoding.getLong(sourceBuffer); - if (count < 0) { - long zc = -count; // 0值的连续个数 - if (zc > Integer.MAX_VALUE) { - throw new IllegalArgumentException( - "An encoded zero count of > Integer.MAX_VALUE was encountered in the source"); - } - zerosCount = (int) zc; - } - if (zerosCount > 0) { - dstIndex += zerosCount; // No need to set zeros in array. Just skip them. - } else { - // 单个非连续的0也会被输出 - if(count > 0){ - //long value = valueFromIndex(dstIndex, subBucketHalfCountMagnitude, subBucketHalfCount, unitMagnitude); - //histogram.recordValueWithCount(value, count); - totalCount += count; - } - dstIndex++; - } - } - return totalCount; - }else{ - throw new UnsupportedOperationException("unsupported method"); - } - } catch (DataFormatException e) { - throw new RuntimeException(e); - } - } - - @Override - public void recordValue(long value) throws RuntimeException { - throw new UnsupportedOperationException("unsupported method"); - } - - @Override - public void recordValueWithCount(long value, long count) throws RuntimeException { - throw new UnsupportedOperationException("unsupported method"); - } - - @Override - public long getValueAtPercentile(double percentile) { - throw new UnsupportedOperationException("unsupported method"); - } - - @Override - public List percentileList(int percentileTicksPerHalfDistance) { - throw new UnsupportedOperationException("unsupported method"); - } - - @Override - public Histogramer resetHistogram() { - throw new UnsupportedOperationException("unsupported method"); - } - - @Override - public Histogramer merge(Histogramer histogram) { - throw new UnsupportedOperationException("unsupported method"); - } - - @Override - public Histogramer makeCopy() throws RuntimeException{ - int cookie = byteBuffer.getInt(initPosition); - if(getCookieBase(cookie) == V2CompressedEncodingCookieBase){ - try { - return ArrayHistogram.decodeFromCompressedByteBuffer(byteBuffer, 2); - } catch (DataFormatException e) { - throw new RuntimeException(e); - } - }else if(getCookieBase(cookie) == V2EncodingCookieBase){ - return ArrayHistogram.decodeFromByteBuffer(byteBuffer, 2); - } - throw new UnsupportedOperationException("unsupported method"); - } - - @Override - public byte[] toBytes() { - int size = byteBuffer.limit() - initPosition; - byte[] bytes = new byte[size]; - assert byteBuffer.order() == ByteOrder.BIG_ENDIAN; - int oldPosition = byteBuffer.position(); - byteBuffer.position(initPosition); - byteBuffer.get(bytes, 0, size); - byteBuffer.position(oldPosition); - return bytes; - } -} - +package org.HdrHistogram; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.List; +import java.util.Map; +import java.util.zip.DataFormatException; +import java.util.zip.Inflater; + +import static java.nio.ByteOrder.BIG_ENDIAN; + +/** + * 直接映射字节数组到Histogram,只读的Histogram,用于druid查询,减少gc减少计算,序列化后的是稀疏数组的形式 + */ +public class DirectMapHistogram implements Histogramer{ + static final int V2maxWordSizeInBytes = 9; // LEB128-64b9B + ZigZag require up to 9 bytes per word + static final int V2EncodingCookieBase = 0x1c849303; + static final int V2CompressedEncodingCookieBase = 0x1c849304; + + final ByteBuffer byteBuffer; + final int initPosition; + long totalCount; + + private DirectMapHistogram(ByteBuffer byteBuffer) { + int initPosition = byteBuffer.position(); + this.byteBuffer = byteBuffer; + this.initPosition = initPosition; + this.totalCount = -1; + } + + public static boolean byteBufferCanToDirectMapHistogram(ByteBuffer byteBuffer) { + int initPosition = byteBuffer.position(); + int cookie = byteBuffer.getInt(initPosition); + return getCookieBase(cookie) == V2EncodingCookieBase || getCookieBase(cookie) == V2CompressedEncodingCookieBase; + } + + public static DirectMapHistogram wrapBytes(byte[] bytes) { + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + return wrapByteBuffer(byteBuffer); + } + + public static DirectMapHistogram wrapByteBuffer(ByteBuffer byteBuffer) { + if(byteBufferCanToDirectMapHistogram(byteBuffer)){ + DirectMapHistogram hll = new DirectMapHistogram(byteBuffer); + return hll; + } + throw new RuntimeException("can not wrapByteBuffer"); + } + + public void mergeInto(AbstractHistogram histogram) throws Exception{ + int cookie = byteBuffer.getInt(initPosition); + if(getCookieBase(cookie) == V2CompressedEncodingCookieBase){ + final int lengthOfCompressedContents = byteBuffer.getInt(initPosition + 4); + final Inflater decompressor = new Inflater(); + + if (byteBuffer.hasArray()) { + decompressor.setInput(byteBuffer.array(), initPosition + 8, lengthOfCompressedContents); + } else { + byte[] compressedContents = new byte[lengthOfCompressedContents]; + byteBuffer.position(initPosition + 8); + try { + byteBuffer.get(compressedContents); + decompressor.setInput(compressedContents); + }finally { + byteBuffer.position(initPosition); + } + } + final int headerSize = 40; + final ByteBuffer headerBuffer = ByteBuffer.allocate(headerSize).order(BIG_ENDIAN); + decompressor.inflate(headerBuffer.array()); + + cookie = headerBuffer.getInt(); + final int payloadLengthInBytes; + final int normalizingIndexOffset; + final int numberOfSignificantValueDigits; + final long lowestTrackableUnitValue; + long highestTrackableValue; + final double integerToDoubleValueConversionRatio; + + assert getCookieBase(cookie) == V2EncodingCookieBase; + + payloadLengthInBytes = headerBuffer.getInt(4); + normalizingIndexOffset = headerBuffer.getInt(8); + numberOfSignificantValueDigits = headerBuffer.getInt( 12); + lowestTrackableUnitValue = headerBuffer.getLong(16); + highestTrackableValue = headerBuffer.getLong(24); + integerToDoubleValueConversionRatio = headerBuffer.getDouble(32); + + highestTrackableValue = Math.max(highestTrackableValue, 2); + + final long largestValueWithSingleUnitResolution = 2 * (long) Math.pow(10, numberOfSignificantValueDigits); + final int unitMagnitude = (int) (Math.log(lowestTrackableUnitValue)/Math.log(2)); + final long unitMagnitudeMask = (1 << unitMagnitude) - 1; + int subBucketCountMagnitude = (int) Math.ceil(Math.log(largestValueWithSingleUnitResolution)/Math.log(2)); + final int subBucketHalfCountMagnitude = subBucketCountMagnitude - 1; + final int subBucketCount = 1 << subBucketCountMagnitude; + final int subBucketHalfCount = subBucketCount / 2; + final long subBucketMask = ((long)subBucketCount - 1) << unitMagnitude; + if (subBucketCountMagnitude + unitMagnitude > 62) { + // subBucketCount entries can't be represented, with unitMagnitude applied, in a positive long. + // Technically it still sort of works if their sum is 63: you can represent all but the last number + // in the shifted subBucketCount. However, the utility of such a histogram vs ones whose magnitude here + // fits in 62 bits is debatable, and it makes it harder to work through the logic. + // Sums larger than 64 are totally broken as leadingZeroCountBase would go negative. + throw new IllegalArgumentException("Cannot represent numberOfSignificantValueDigits worth of values " + + "beyond lowestDiscernibleValue"); + } + + final int expectedCapacity = payloadLengthInBytes; + + ByteBuffer sourceBuffer = ByteBuffer.allocate(expectedCapacity).order(BIG_ENDIAN); + int decompressedByteCount = decompressor.inflate(sourceBuffer.array()); + decompressor.end(); // 必须手动调用,否则快速调用可能内存溢出(堆外内存) + if ((payloadLengthInBytes != Integer.MAX_VALUE) && (decompressedByteCount < payloadLengthInBytes)) { + throw new IllegalArgumentException("The buffer does not contain the indicated payload amount"); + } + assert decompressedByteCount == expectedCapacity; + + int dstIndex = 0; + int endPosition = sourceBuffer.position() + expectedCapacity; //期望的结束读取的索引 + while (sourceBuffer.position() < endPosition) { + long count; + int zerosCount = 0; + // V2 encoding format uses a long encoded in a ZigZag LEB128 format (up to V2maxWordSizeInBytes): + count = ZigZagEncoding.getLong(sourceBuffer); + if (count < 0) { + long zc = -count; // 0值的连续个数 + if (zc > Integer.MAX_VALUE) { + throw new IllegalArgumentException( + "An encoded zero count of > Integer.MAX_VALUE was encountered in the source"); + } + zerosCount = (int) zc; + } + if (zerosCount > 0) { + dstIndex += zerosCount; // No need to set zeros in array. Just skip them. + } else { + // 单个非连续的0也会被输出 + if(count > 0){ + long value = valueFromIndex(dstIndex, subBucketHalfCountMagnitude, subBucketHalfCount, unitMagnitude); + histogram.recordValueWithCount(value, count); + } + dstIndex++; + } + } + + }else if(getCookieBase(cookie) == V2EncodingCookieBase){ + final int payloadLengthInBytes; + final int normalizingIndexOffset; + final int numberOfSignificantValueDigits; + final long lowestTrackableUnitValue; + long highestTrackableValue; + final double integerToDoubleValueConversionRatio; + + payloadLengthInBytes = byteBuffer.getInt(initPosition + 4); + normalizingIndexOffset = byteBuffer.getInt(initPosition + 8); + numberOfSignificantValueDigits = byteBuffer.getInt(initPosition + 12); + lowestTrackableUnitValue = byteBuffer.getLong(initPosition + 16); + highestTrackableValue = byteBuffer.getLong(initPosition + 24); + integerToDoubleValueConversionRatio = byteBuffer.getDouble(initPosition + 32); + + highestTrackableValue = Math.max(highestTrackableValue, 2); + + final long largestValueWithSingleUnitResolution = 2 * (long) Math.pow(10, numberOfSignificantValueDigits); + final int unitMagnitude = (int) (Math.log(lowestTrackableUnitValue)/Math.log(2)); + final long unitMagnitudeMask = (1 << unitMagnitude) - 1; + int subBucketCountMagnitude = (int) Math.ceil(Math.log(largestValueWithSingleUnitResolution)/Math.log(2)); + final int subBucketHalfCountMagnitude = subBucketCountMagnitude - 1; + final int subBucketCount = 1 << subBucketCountMagnitude; + final int subBucketHalfCount = subBucketCount / 2; + final long subBucketMask = ((long)subBucketCount - 1) << unitMagnitude; + if (subBucketCountMagnitude + unitMagnitude > 62) { + // subBucketCount entries can't be represented, with unitMagnitude applied, in a positive long. + // Technically it still sort of works if their sum is 63: you can represent all but the last number + // in the shifted subBucketCount. However, the utility of such a histogram vs ones whose magnitude here + // fits in 62 bits is debatable, and it makes it harder to work through the logic. + // Sums larger than 64 are totally broken as leadingZeroCountBase would go negative. + throw new IllegalArgumentException("Cannot represent numberOfSignificantValueDigits worth of values " + + "beyond lowestDiscernibleValue"); + } + + final int expectedCapacity =payloadLengthInBytes; + assert expectedCapacity == payloadLengthInBytes; + if(expectedCapacity > byteBuffer.limit() - 40){ + throw new IllegalArgumentException("The buffer does not contain the full Histogram payload"); + } + final int position = initPosition + 40; + final int lengthInBytes = expectedCapacity; + final int wordSizeInBytes = V2maxWordSizeInBytes; + // fillCountsArrayFromSourceBuffer + + ByteBuffer sourceBuffer = byteBuffer.duplicate(); + sourceBuffer.position(position); + final long maxAllowableCountInHistigram = Long.MAX_VALUE; + int dstIndex = 0; + int endPosition = sourceBuffer.position() + lengthInBytes; //期望的结束读取的索引 + while (sourceBuffer.position() < endPosition) { + long count; + int zerosCount = 0; + // V2 encoding format uses a long encoded in a ZigZag LEB128 format (up to V2maxWordSizeInBytes): + count = ZigZagEncoding.getLong(sourceBuffer); + if (count < 0) { + long zc = -count; // 0值的连续个数 + if (zc > Integer.MAX_VALUE) { + throw new IllegalArgumentException( + "An encoded zero count of > Integer.MAX_VALUE was encountered in the source"); + } + zerosCount = (int) zc; + } + if (zerosCount > 0) { + dstIndex += zerosCount; // No need to set zeros in array. Just skip them. + } else { + // 单个非连续的0也会被输出 + if(count > 0){ + long value = valueFromIndex(dstIndex, subBucketHalfCountMagnitude, subBucketHalfCount, unitMagnitude); + histogram.recordValueWithCount(value, count); + } + dstIndex++; + } + } + }else{ + throw new RuntimeException("can not wrapByteBuffer"); + } + } + + final long valueFromIndex(final int index, int subBucketHalfCountMagnitude, int subBucketHalfCount, int unitMagnitude) { + int bucketIndex = (index >> subBucketHalfCountMagnitude) - 1; + int subBucketIndex = (index & (subBucketHalfCount - 1)) + subBucketHalfCount; + if (bucketIndex < 0) { + subBucketIndex -= subBucketHalfCount; + bucketIndex = 0; + } + return valueFromIndex(bucketIndex, subBucketIndex, unitMagnitude); + } + + private long valueFromIndex(final int bucketIndex, final int subBucketIndex, int unitMagnitude) { + return ((long) subBucketIndex) << (bucketIndex + unitMagnitude); + } + + static int getCookieBase(final int cookie) { + return (cookie & ~0xf0); + } + + @Override + public long getTotalCount() { + if(totalCount >= 0){ + return totalCount; + } + try { + totalCount = 0; + int cookie = byteBuffer.getInt(initPosition); + if(getCookieBase(cookie) == V2CompressedEncodingCookieBase){ + final int lengthOfCompressedContents = byteBuffer.getInt(initPosition + 4); + final Inflater decompressor = new Inflater(); + + if (byteBuffer.hasArray()) { + decompressor.setInput(byteBuffer.array(), initPosition + 8, lengthOfCompressedContents); + } else { + byte[] compressedContents = new byte[lengthOfCompressedContents]; + byteBuffer.position(initPosition + 8); + try { + byteBuffer.get(compressedContents); + decompressor.setInput(compressedContents); + }finally { + byteBuffer.position(initPosition); + } + } + final int headerSize = 40; + final ByteBuffer headerBuffer = ByteBuffer.allocate(headerSize).order(BIG_ENDIAN); + decompressor.inflate(headerBuffer.array()); + + cookie = headerBuffer.getInt(); + final int payloadLengthInBytes; + final int normalizingIndexOffset; + final int numberOfSignificantValueDigits; + final long lowestTrackableUnitValue; + long highestTrackableValue; + final double integerToDoubleValueConversionRatio; + + assert getCookieBase(cookie) == V2EncodingCookieBase; + + payloadLengthInBytes = headerBuffer.getInt(4); + normalizingIndexOffset = headerBuffer.getInt(8); + numberOfSignificantValueDigits = headerBuffer.getInt( 12); + lowestTrackableUnitValue = headerBuffer.getLong(16); + highestTrackableValue = headerBuffer.getLong(24); + integerToDoubleValueConversionRatio = headerBuffer.getDouble(32); + + highestTrackableValue = Math.max(highestTrackableValue, 2); + + final long largestValueWithSingleUnitResolution = 2 * (long) Math.pow(10, numberOfSignificantValueDigits); + final int unitMagnitude = (int) (Math.log(lowestTrackableUnitValue)/Math.log(2)); + final long unitMagnitudeMask = (1 << unitMagnitude) - 1; + int subBucketCountMagnitude = (int) Math.ceil(Math.log(largestValueWithSingleUnitResolution)/Math.log(2)); + final int subBucketHalfCountMagnitude = subBucketCountMagnitude - 1; + final int subBucketCount = 1 << subBucketCountMagnitude; + final int subBucketHalfCount = subBucketCount / 2; + final long subBucketMask = ((long)subBucketCount - 1) << unitMagnitude; + if (subBucketCountMagnitude + unitMagnitude > 62) { + // subBucketCount entries can't be represented, with unitMagnitude applied, in a positive long. + // Technically it still sort of works if their sum is 63: you can represent all but the last number + // in the shifted subBucketCount. However, the utility of such a histogram vs ones whose magnitude here + // fits in 62 bits is debatable, and it makes it harder to work through the logic. + // Sums larger than 64 are totally broken as leadingZeroCountBase would go negative. + throw new IllegalArgumentException("Cannot represent numberOfSignificantValueDigits worth of values " + + "beyond lowestDiscernibleValue"); + } + + final int expectedCapacity = payloadLengthInBytes; + + ByteBuffer sourceBuffer = ByteBuffer.allocate(expectedCapacity).order(BIG_ENDIAN); + int decompressedByteCount = decompressor.inflate(sourceBuffer.array()); + decompressor.end(); // 必须手动调用,否则快速调用可能内存溢出(堆外内存) + if ((payloadLengthInBytes != Integer.MAX_VALUE) && (decompressedByteCount < payloadLengthInBytes)) { + throw new IllegalArgumentException("The buffer does not contain the indicated payload amount"); + } + assert decompressedByteCount == expectedCapacity; + + int dstIndex = 0; + int endPosition = sourceBuffer.position() + expectedCapacity; //期望的结束读取的索引 + while (sourceBuffer.position() < endPosition) { + long count; + int zerosCount = 0; + // V2 encoding format uses a long encoded in a ZigZag LEB128 format (up to V2maxWordSizeInBytes): + count = ZigZagEncoding.getLong(sourceBuffer); + if (count < 0) { + long zc = -count; // 0值的连续个数 + if (zc > Integer.MAX_VALUE) { + throw new IllegalArgumentException( + "An encoded zero count of > Integer.MAX_VALUE was encountered in the source"); + } + zerosCount = (int) zc; + } + if (zerosCount > 0) { + dstIndex += zerosCount; // No need to set zeros in array. Just skip them. + } else { + // 单个非连续的0也会被输出 + if(count > 0){ + //long value = valueFromIndex(dstIndex, subBucketHalfCountMagnitude, subBucketHalfCount, unitMagnitude); + //histogram.recordValueWithCount(value, count); + totalCount += count; + } + dstIndex++; + } + } + return totalCount; + }else if(getCookieBase(cookie) == V2EncodingCookieBase){ + final int payloadLengthInBytes; + final int normalizingIndexOffset; + final int numberOfSignificantValueDigits; + final long lowestTrackableUnitValue; + long highestTrackableValue; + final double integerToDoubleValueConversionRatio; + + payloadLengthInBytes = byteBuffer.getInt(initPosition + 4); + normalizingIndexOffset = byteBuffer.getInt(initPosition + 8); + numberOfSignificantValueDigits = byteBuffer.getInt(initPosition + 12); + lowestTrackableUnitValue = byteBuffer.getLong(initPosition + 16); + highestTrackableValue = byteBuffer.getLong(initPosition + 24); + integerToDoubleValueConversionRatio = byteBuffer.getDouble(initPosition + 32); + + highestTrackableValue = Math.max(highestTrackableValue, 2); + + final long largestValueWithSingleUnitResolution = 2 * (long) Math.pow(10, numberOfSignificantValueDigits); + final int unitMagnitude = (int) (Math.log(lowestTrackableUnitValue)/Math.log(2)); + final long unitMagnitudeMask = (1 << unitMagnitude) - 1; + int subBucketCountMagnitude = (int) Math.ceil(Math.log(largestValueWithSingleUnitResolution)/Math.log(2)); + final int subBucketHalfCountMagnitude = subBucketCountMagnitude - 1; + final int subBucketCount = 1 << subBucketCountMagnitude; + final int subBucketHalfCount = subBucketCount / 2; + final long subBucketMask = ((long)subBucketCount - 1) << unitMagnitude; + if (subBucketCountMagnitude + unitMagnitude > 62) { + // subBucketCount entries can't be represented, with unitMagnitude applied, in a positive long. + // Technically it still sort of works if their sum is 63: you can represent all but the last number + // in the shifted subBucketCount. However, the utility of such a histogram vs ones whose magnitude here + // fits in 62 bits is debatable, and it makes it harder to work through the logic. + // Sums larger than 64 are totally broken as leadingZeroCountBase would go negative. + throw new IllegalArgumentException("Cannot represent numberOfSignificantValueDigits worth of values " + + "beyond lowestDiscernibleValue"); + } + + final int expectedCapacity =payloadLengthInBytes; + assert expectedCapacity == payloadLengthInBytes; + if(expectedCapacity > byteBuffer.limit() - 40){ + throw new IllegalArgumentException("The buffer does not contain the full Histogram payload"); + } + final int position = initPosition + 40; + final int lengthInBytes = expectedCapacity; + final int wordSizeInBytes = V2maxWordSizeInBytes; + // fillCountsArrayFromSourceBuffer + + ByteBuffer sourceBuffer = byteBuffer.duplicate(); + sourceBuffer.position(position); + final long maxAllowableCountInHistigram = Long.MAX_VALUE; + int dstIndex = 0; + int endPosition = sourceBuffer.position() + lengthInBytes; //期望的结束读取的索引 + while (sourceBuffer.position() < endPosition) { + long count; + int zerosCount = 0; + // V2 encoding format uses a long encoded in a ZigZag LEB128 format (up to V2maxWordSizeInBytes): + count = ZigZagEncoding.getLong(sourceBuffer); + if (count < 0) { + long zc = -count; // 0值的连续个数 + if (zc > Integer.MAX_VALUE) { + throw new IllegalArgumentException( + "An encoded zero count of > Integer.MAX_VALUE was encountered in the source"); + } + zerosCount = (int) zc; + } + if (zerosCount > 0) { + dstIndex += zerosCount; // No need to set zeros in array. Just skip them. + } else { + // 单个非连续的0也会被输出 + if(count > 0){ + //long value = valueFromIndex(dstIndex, subBucketHalfCountMagnitude, subBucketHalfCount, unitMagnitude); + //histogram.recordValueWithCount(value, count); + totalCount += count; + } + dstIndex++; + } + } + return totalCount; + }else{ + throw new UnsupportedOperationException("unsupported method"); + } + } catch (DataFormatException e) { + throw new RuntimeException(e); + } + } + + @Override + public void recordValue(long value) throws RuntimeException { + throw new UnsupportedOperationException("unsupported method"); + } + + @Override + public void recordValueWithCount(long value, long count) throws RuntimeException { + throw new UnsupportedOperationException("unsupported method"); + } + + @Override + public long getValueAtPercentile(double percentile) { + throw new UnsupportedOperationException("unsupported method"); + } + + @Override + public List percentileList(int percentileTicksPerHalfDistance) { + throw new UnsupportedOperationException("unsupported method"); + } + + @Override + public Map describe() { + throw new UnsupportedOperationException("unsupported method"); + } + + @Override + public Histogramer resetHistogram() { + throw new UnsupportedOperationException("unsupported method"); + } + + @Override + public Histogramer merge(Histogramer histogram) { + throw new UnsupportedOperationException("unsupported method"); + } + + @Override + public Histogramer makeCopy() throws RuntimeException{ + int cookie = byteBuffer.getInt(initPosition); + if(getCookieBase(cookie) == V2CompressedEncodingCookieBase){ + try { + return ArrayHistogram.decodeFromCompressedByteBuffer(byteBuffer, 2); + } catch (DataFormatException e) { + throw new RuntimeException(e); + } + }else if(getCookieBase(cookie) == V2EncodingCookieBase){ + return ArrayHistogram.decodeFromByteBuffer(byteBuffer, 2); + } + throw new UnsupportedOperationException("unsupported method"); + } + + @Override + public byte[] toBytes() { + int size = byteBuffer.limit() - initPosition; + byte[] bytes = new byte[size]; + assert byteBuffer.order() == ByteOrder.BIG_ENDIAN; + int oldPosition = byteBuffer.position(); + byteBuffer.position(initPosition); + byteBuffer.get(bytes, 0, size); + byteBuffer.position(oldPosition); + return bytes; + } +} + diff --git a/druid-hdrhistogram/src/main/java/org/HdrHistogram/HistogramSketch.java b/druid-hdrhistogram/src/main/java/org/HdrHistogram/HistogramSketch.java index 569df20..b527e75 100644 --- a/druid-hdrhistogram/src/main/java/org/HdrHistogram/HistogramSketch.java +++ b/druid-hdrhistogram/src/main/java/org/HdrHistogram/HistogramSketch.java @@ -1,85 +1,90 @@ -package org.HdrHistogram; - -import java.nio.ByteBuffer; -import java.util.List; - -public class HistogramSketch { - public Histogramer hisImpl = null; - - public HistogramSketch(final int numberOfSignificantValueDigits){ - hisImpl = new ArrayHistogram(numberOfSignificantValueDigits); - } - - public HistogramSketch(final long lowestDiscernibleValue, final long highestTrackableValue, - final int numberOfSignificantValueDigits, final boolean autoResize){ - ArrayHistogram histogram = new ArrayHistogram(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits); - histogram.setAutoResize(autoResize); - hisImpl = histogram; - } - - public HistogramSketch(final Histogramer that) { - hisImpl = that; - } - - /** - * Copy constructor used by copy(). - */ - HistogramSketch(final HistogramSketch that) { - hisImpl = that.hisImpl.makeCopy(); - } - - /** - * 复制hisImpl到堆内存实例hisImpl - */ - public HistogramSketch copy() { - return new HistogramSketch(this); - } - - public void reset() { - hisImpl = hisImpl.resetHistogram(); - } - - public long getTotalCount(){ - return hisImpl.getTotalCount(); - } - - public void recordValue(long value){ - hisImpl.recordValue(value); - } - - public void recordValueWithCount(long value, long count){ - hisImpl.recordValueWithCount(value, count); - } - - public long getValueAtPercentile(double percentile){ - return hisImpl.getValueAtPercentile(percentile); - } - - public List percentileList(int percentileTicksPerHalfDistance){ - return hisImpl.percentileList(percentileTicksPerHalfDistance); - } - - public static final int getUpdatableSerializationBytes(long lowestDiscernibleValue, long highestTrackableValue, int numberOfSignificantValueDigits){ - return DirectArrayHistogram.getUpdatableSerializationBytes(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits); - } - - public byte[] toBytes() { - return hisImpl.toBytes(); - } - - public static HistogramSketch fromBytes(byte[] bytes) { - return new HistogramSketch(ArrayHistogram.fromBytes(bytes)); - } - - public static HistogramSketch fromByteBuffer(ByteBuffer byteBuffer) { - return new HistogramSketch(ArrayHistogram.fromByteBuffer(byteBuffer)); - } - - public static HistogramSketch wrapBytes(byte[] bytes) { - return new HistogramSketch(DirectMapHistogram.wrapBytes(bytes)); - } - - public static HistogramSketch wrapByteBuffer(ByteBuffer byteBuffer) { - return new HistogramSketch(DirectMapHistogram.wrapByteBuffer(byteBuffer)); - } -} +package org.HdrHistogram; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; + +public class HistogramSketch { + public Histogramer hisImpl = null; + + public HistogramSketch(final int numberOfSignificantValueDigits){ + hisImpl = new ArrayHistogram(numberOfSignificantValueDigits); + } + + public HistogramSketch(final long lowestDiscernibleValue, final long highestTrackableValue, + final int numberOfSignificantValueDigits, final boolean autoResize){ + ArrayHistogram histogram = new ArrayHistogram(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits); + histogram.setAutoResize(autoResize); + hisImpl = histogram; + } + + public HistogramSketch(final Histogramer that) { + hisImpl = that; + } + + /** + * Copy constructor used by copy(). + */ + HistogramSketch(final HistogramSketch that) { + hisImpl = that.hisImpl.makeCopy(); + } + + /** + * 复制hisImpl到堆内存实例hisImpl + */ + public HistogramSketch copy() { + return new HistogramSketch(this); + } + + public void reset() { + hisImpl = hisImpl.resetHistogram(); + } + + public long getTotalCount(){ + return hisImpl.getTotalCount(); + } + + public void recordValue(long value){ + hisImpl.recordValue(value); + } + + public void recordValueWithCount(long value, long count){ + hisImpl.recordValueWithCount(value, count); + } + + public long getValueAtPercentile(double percentile){ + return hisImpl.getValueAtPercentile(percentile); + } + + public List percentileList(int percentileTicksPerHalfDistance){ + return hisImpl.percentileList(percentileTicksPerHalfDistance); + } + + public Map describe(){ + return hisImpl.describe(); + } + + public static final int getUpdatableSerializationBytes(long lowestDiscernibleValue, long highestTrackableValue, int numberOfSignificantValueDigits){ + return DirectArrayHistogram.getUpdatableSerializationBytes(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits); + } + + public byte[] toBytes() { + return hisImpl.toBytes(); + } + + public static HistogramSketch fromBytes(byte[] bytes) { + return new HistogramSketch(ArrayHistogram.fromBytes(bytes)); + } + + public static HistogramSketch fromByteBuffer(ByteBuffer byteBuffer) { + return new HistogramSketch(ArrayHistogram.fromByteBuffer(byteBuffer)); + } + + public static HistogramSketch wrapBytes(byte[] bytes) { + return new HistogramSketch(DirectMapHistogram.wrapBytes(bytes)); + } + + public static HistogramSketch wrapByteBuffer(ByteBuffer byteBuffer) { + return new HistogramSketch(DirectMapHistogram.wrapByteBuffer(byteBuffer)); + } +} diff --git a/druid-hdrhistogram/src/main/java/org/HdrHistogram/Histogramer.java b/druid-hdrhistogram/src/main/java/org/HdrHistogram/Histogramer.java index 2c4ec3a..4901b1d 100644 --- a/druid-hdrhistogram/src/main/java/org/HdrHistogram/Histogramer.java +++ b/druid-hdrhistogram/src/main/java/org/HdrHistogram/Histogramer.java @@ -1,34 +1,37 @@ -package org.HdrHistogram; - -import java.nio.ByteBuffer; -import java.util.List; - -public interface Histogramer { - long getTotalCount(); - - void recordValue(long value) throws RuntimeException; - - void recordValueWithCount(long value, long count) throws RuntimeException; - - long getValueAtPercentile(double percentile); - - List percentileList(int percentileTicksPerHalfDistance); - - Histogramer resetHistogram(); - - Histogramer merge(Histogramer histogram); - - // 复制到堆内存实例ArrayHistogram - Histogramer makeCopy(); - - byte[] toBytes(); - - default byte[] byteBuffer2Bytes(ByteBuffer byteBuffer){ - //必须调用完后flip()才可以调用此方法 - byteBuffer.flip(); - int len = byteBuffer.limit() - byteBuffer.position(); - byte[] bytes = new byte[len]; - byteBuffer.get(bytes); - return bytes; - } -} +package org.HdrHistogram; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; + +public interface Histogramer { + long getTotalCount(); + + void recordValue(long value) throws RuntimeException; + + void recordValueWithCount(long value, long count) throws RuntimeException; + + long getValueAtPercentile(double percentile); + + List percentileList(int percentileTicksPerHalfDistance); + + Map describe(); + + Histogramer resetHistogram(); + + Histogramer merge(Histogramer histogram); + + // 复制到堆内存实例ArrayHistogram + Histogramer makeCopy(); + + byte[] toBytes(); + + default byte[] byteBuffer2Bytes(ByteBuffer byteBuffer){ + //必须调用完后flip()才可以调用此方法 + byteBuffer.flip(); + int len = byteBuffer.limit() - byteBuffer.position(); + byte[] bytes = new byte[len]; + byteBuffer.get(bytes); + return bytes; + } +} diff --git a/druid-hdrhistogram/src/main/java/org/HdrHistogram/Percentile.java b/druid-hdrhistogram/src/main/java/org/HdrHistogram/Percentile.java index 6b7be13..ad70ca5 100644 --- a/druid-hdrhistogram/src/main/java/org/HdrHistogram/Percentile.java +++ b/druid-hdrhistogram/src/main/java/org/HdrHistogram/Percentile.java @@ -1,41 +1,50 @@ -package org.HdrHistogram; - -public class Percentile { - public long value; - public long count; - public double percentile; - - public Percentile() { - - } - - public Percentile(long value, long count, double percentile) { - this.value = value; - this.count = count; - this.percentile = percentile; - } - - public long getValue() { - return value; - } - - public void setValue(long value) { - this.value = value; - } - - public long getCount() { - return count; - } - - public void setCount(long count) { - this.count = count; - } - - public double getPercentile() { - return percentile; - } - - public void setPercentile(double percentile) { - this.percentile = percentile; - } -} +package org.HdrHistogram; + +public class Percentile { + public long value; + public long count; + public double percentile; + + public Percentile() { + + } + + public Percentile(long value, long count, double percentile) { + this.value = value; + this.count = count; + this.percentile = percentile; + } + + public long getValue() { + return value; + } + + public void setValue(long value) { + this.value = value; + } + + public long getCount() { + return count; + } + + public void setCount(long count) { + this.count = count; + } + + public double getPercentile() { + return percentile; + } + + public void setPercentile(double percentile) { + this.percentile = percentile; + } + + @Override + public String toString() { + return "Percentile{" + + "value=" + value + + ", count=" + count + + ", percentile=" + percentile + + '}'; + } +} diff --git a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramAggregatorFactory.java b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramAggregatorFactory.java index fd365b8..869a23f 100644 --- a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramAggregatorFactory.java +++ b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramAggregatorFactory.java @@ -21,7 +21,7 @@ public class HdrHistogramAggregatorFactory extends AggregatorFactory { public static final long DEFAULT_HIGHEST = 2; public static final int DEFAULT_SIGNIFICANT = 1; public static final boolean DEFAULT_AUTO_RESIZE = true; - public static final long BUFFER_AUTO_RESIZE_HIGHEST = 100000000L * 1000000L; + public static final long BUFFER_AUTO_RESIZE_HIGHEST = 100000000L * 100L; public static final Comparator COMPARATOR = Comparator.nullsFirst(Comparator.comparingLong(HistogramSketch::getTotalCount)); diff --git a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramMergeBufferAggregator.java b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramMergeBufferAggregator.java index 8cdb7f0..ef5c6d2 100644 --- a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramMergeBufferAggregator.java +++ b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramMergeBufferAggregator.java @@ -1,134 +1,134 @@ -package org.apache.druid.query.aggregation.sketch.HdrHistogram; - -import it.unimi.dsi.fastutil.ints.Int2ObjectMap; -import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; -import org.HdrHistogram.*; -import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.query.aggregation.BufferAggregator; -import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; -import org.apache.druid.segment.BaseObjectColumnValueSelector; - -import javax.annotation.Nullable; -import java.nio.ByteBuffer; -import java.util.IdentityHashMap; - -public class HdrHistogramMergeBufferAggregator implements BufferAggregator { - private static final Logger LOG = new Logger(HdrHistogramAggregator.class); - private long lastTs = 0L; - private final BaseObjectColumnValueSelector selector; - private final long lowestDiscernibleValue; - private final long highestTrackableValue; - private final int numberOfSignificantValueDigits; - private final boolean autoResize; - private final int size; - private final IdentityHashMap> histograms = new IdentityHashMap<>(); - - public HdrHistogramMergeBufferAggregator( - BaseObjectColumnValueSelector selector, - long lowestDiscernibleValue, - long highestTrackableValue, - int numberOfSignificantValueDigits, - boolean autoResize, - int size - ) { - this.selector = selector; - this.lowestDiscernibleValue = lowestDiscernibleValue; - this.highestTrackableValue = highestTrackableValue; - this.numberOfSignificantValueDigits = numberOfSignificantValueDigits; - this.autoResize = autoResize; - this.size = size; - LOG.error("HdrHistogramMergeBufferAggregator gene:" + Thread.currentThread().getName() + "-" + Thread.currentThread().getId()); - } - - @Override - public synchronized void init(ByteBuffer buf, int position) { - final int oldPosition = buf.position(); - try { - buf.position(position); - - long highest = autoResize?HdrHistogramAggregatorFactory.BUFFER_AUTO_RESIZE_HIGHEST: highestTrackableValue; - final DirectArrayHistogram histogram = new DirectArrayHistogram(lowestDiscernibleValue, highest, numberOfSignificantValueDigits, buf); - histogram.reset(); - HistogramUnion union = new HistogramUnion(new HistogramSketch(histogram)); - putUnion(buf, position, union); - }finally { - buf.position(oldPosition); - } - } - - @Override - public synchronized void aggregate(ByteBuffer buf, int position) { - /*long ts = System.currentTimeMillis(); - if(ts - lastTs > 2000){ - //LOG.warn("HdrHistogramMergeBufferAggregator call"); - LOG.error("HdrHistogramMergeBufferAggregator call"); - lastTs = ts; - }*/ - HistogramSketch h = selector.getObject(); - if (h == null) { - return; - } - - final int oldPosition = buf.position(); - try { - buf.position(position); - - HistogramUnion union = histograms.get(buf).get(position); - union.update(h); - }finally{ - buf.position(oldPosition); - } - } - - @Nullable - @Override - public synchronized HistogramSketch get(ByteBuffer buf, int position) { - LOG.error("HdrHistogramMergeBufferAggregator get:" + 0 + "-" + Thread.currentThread().getId() + "-" + this); - HistogramUnion union = histograms.get(buf).get(position); - //return histogram.copy(); - return union.getResult().copy(); - } - - @Override - public synchronized void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer) { - HistogramUnion union = histograms.get(oldBuffer).get(oldPosition); - - Int2ObjectMap map = histograms.get(oldBuffer); - map.remove(oldPosition); - if (map.isEmpty()) { - histograms.remove(oldBuffer); - } - - try { - newBuffer.position(newPosition); - union.resetByteBuffer(newBuffer); - putUnion(newBuffer, newPosition, union); - }finally { - newBuffer.position(newPosition); - } - } - - private void putUnion(final ByteBuffer buffer, final int position, final HistogramUnion union) { - Int2ObjectMap map = histograms.computeIfAbsent(buffer, buf -> new Int2ObjectOpenHashMap<>()); - map.put(position, union); - } - @Override - public float getFloat(ByteBuffer buf, int position) { - throw new UnsupportedOperationException("Not implemented"); - } - - @Override - public long getLong(ByteBuffer buf, int position) { - throw new UnsupportedOperationException("Not implemented"); - } - - @Override - public void close() { - - } - - @Override - public void inspectRuntimeShape(RuntimeShapeInspector inspector){ - inspector.visit("selector", selector); - } -} +package org.apache.druid.query.aggregation.sketch.HdrHistogram; + +import it.unimi.dsi.fastutil.ints.Int2ObjectMap; +import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; +import org.HdrHistogram.*; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.segment.BaseObjectColumnValueSelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.util.IdentityHashMap; + +public class HdrHistogramMergeBufferAggregator implements BufferAggregator { + private static final Logger LOG = new Logger(HdrHistogramAggregator.class); + private long lastTs = 0L; + private final BaseObjectColumnValueSelector selector; + private final long lowestDiscernibleValue; + private final long highestTrackableValue; + private final int numberOfSignificantValueDigits; + private final boolean autoResize; + private final int size; + private final IdentityHashMap> histograms = new IdentityHashMap<>(); + + public HdrHistogramMergeBufferAggregator( + BaseObjectColumnValueSelector selector, + long lowestDiscernibleValue, + long highestTrackableValue, + int numberOfSignificantValueDigits, + boolean autoResize, + int size + ) { + this.selector = selector; + this.lowestDiscernibleValue = lowestDiscernibleValue; + this.highestTrackableValue = highestTrackableValue; + this.numberOfSignificantValueDigits = numberOfSignificantValueDigits; + this.autoResize = autoResize; + this.size = size; + //LOG.error("HdrHistogramMergeBufferAggregator gene:" + Thread.currentThread().getName() + "-" + Thread.currentThread().getId()); + } + + @Override + public synchronized void init(ByteBuffer buf, int position) { + final int oldPosition = buf.position(); + try { + buf.position(position); + + long highest = autoResize?HdrHistogramAggregatorFactory.BUFFER_AUTO_RESIZE_HIGHEST: highestTrackableValue; + final DirectArrayHistogram histogram = new DirectArrayHistogram(lowestDiscernibleValue, highest, numberOfSignificantValueDigits, buf); + histogram.reset(); + HistogramUnion union = new HistogramUnion(new HistogramSketch(histogram)); + putUnion(buf, position, union); + }finally { + buf.position(oldPosition); + } + } + + @Override + public synchronized void aggregate(ByteBuffer buf, int position) { + /*long ts = System.currentTimeMillis(); + if(ts - lastTs > 2000){ + //LOG.warn("HdrHistogramMergeBufferAggregator call"); + LOG.error("HdrHistogramMergeBufferAggregator call"); + lastTs = ts; + }*/ + HistogramSketch h = selector.getObject(); + if (h == null) { + return; + } + + final int oldPosition = buf.position(); + try { + buf.position(position); + + HistogramUnion union = histograms.get(buf).get(position); + union.update(h); + }finally{ + buf.position(oldPosition); + } + } + + @Nullable + @Override + public synchronized HistogramSketch get(ByteBuffer buf, int position) { + //LOG.error("HdrHistogramMergeBufferAggregator get:" + 0 + "-" + Thread.currentThread().getId() + "-" + this); + HistogramUnion union = histograms.get(buf).get(position); + //return histogram.copy(); + return union.getResult().copy(); + } + + @Override + public synchronized void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer) { + HistogramUnion union = histograms.get(oldBuffer).get(oldPosition); + + Int2ObjectMap map = histograms.get(oldBuffer); + map.remove(oldPosition); + if (map.isEmpty()) { + histograms.remove(oldBuffer); + } + + try { + newBuffer.position(newPosition); + union.resetByteBuffer(newBuffer); + putUnion(newBuffer, newPosition, union); + }finally { + newBuffer.position(newPosition); + } + } + + private void putUnion(final ByteBuffer buffer, final int position, final HistogramUnion union) { + Int2ObjectMap map = histograms.computeIfAbsent(buffer, buf -> new Int2ObjectOpenHashMap<>()); + map.put(position, union); + } + @Override + public float getFloat(ByteBuffer buf, int position) { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public long getLong(ByteBuffer buf, int position) { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public void close() { + + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector){ + inspector.visit("selector", selector); + } +} diff --git a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramModule.java b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramModule.java index 5041965..cf09046 100644 --- a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramModule.java +++ b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramModule.java @@ -9,10 +9,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.inject.Binder; import org.HdrHistogram.HistogramSketch; import org.apache.druid.initialization.DruidModule; -import org.apache.druid.query.aggregation.sketch.HdrHistogram.sql.HdrHistogramObjectSqlAggregator; -import org.apache.druid.query.aggregation.sketch.HdrHistogram.sql.HdrHistogramPercentilesOperatorConversion; -import org.apache.druid.query.aggregation.sketch.HdrHistogram.sql.HdrHistogramQuantileSqlAggregator; -import org.apache.druid.query.aggregation.sketch.HdrHistogram.sql.HdrHistogramQuantilesOperatorConversion; +import org.apache.druid.query.aggregation.sketch.HdrHistogram.sql.*; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.serde.ComplexMetrics; import org.apache.druid.sql.guice.SqlBindings; @@ -27,6 +24,8 @@ public class HdrHistogramModule implements DruidModule { public static final byte QUANTILES_HDRHISTOGRAM_TO_QUANTILE_CACHE_TYPE_ID = 0x03; public static final byte QUANTILES_HDRHISTOGRAM_TO_QUANTILES_CACHE_TYPE_ID = 0x04; public static final byte QUANTILES_HDRHISTOGRAM_TO_PERCENTILES_CACHE_TYPE_ID = 0x05; + public static final byte QUANTILES_HDRHISTOGRAM_TO_DESCRIBE_CACHE_TYPE_ID = 0x06; + public static final byte QUANTILES_HDRHISTOGRAM_TO_PERCENTILES_DESCRIBE_CACHE_TYPE_ID = 0x07; public static final String HDRHISTOGRAM_TYPE_NAME = "HdrHistogramSketch"; public static final ColumnType TYPE = ColumnType.ofComplex(HDRHISTOGRAM_TYPE_NAME); @@ -50,6 +49,8 @@ public class HdrHistogramModule implements DruidModule { SqlBindings.addOperatorConversion(binder, HdrHistogramQuantilesOperatorConversion.class); SqlBindings.addOperatorConversion(binder, HdrHistogramPercentilesOperatorConversion.class); + SqlBindings.addOperatorConversion(binder, HdrHistogramDescribeOperatorConversion.class); + SqlBindings.addOperatorConversion(binder, HdrHistogramPercentilesDescribeOperatorConversion.class); } @Override @@ -61,7 +62,9 @@ public class HdrHistogramModule implements DruidModule { new NamedType(HdrHistogramMergeAggregatorFactory.class, "HdrHistogramSketchMerge"), new NamedType(HdrHistogramToQuantilePostAggregator.class, "HdrHistogramSketchToQuantile"), new NamedType(HdrHistogramToQuantilesPostAggregator.class, "HdrHistogramSketchToQuantiles"), - new NamedType(HdrHistogramToPercentilesPostAggregator.class, "HdrHistogramSketchToPercentiles") + new NamedType(HdrHistogramToPercentilesPostAggregator.class, "HdrHistogramSketchToPercentiles"), + new NamedType(HdrHistogramToDescribePostAggregator.class, "HdrHistogramSketchToDescribe"), + new NamedType(HdrHistogramToPercentilesDescribePostAggregator.class, "HdrHistogramSketchToPercentilesDescription") ).addSerializer(HistogramSketch.class, new HistogramJsonSerializer()) ); } diff --git a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramToDescribePostAggregator.java b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramToDescribePostAggregator.java new file mode 100644 index 0000000..1a70dc3 --- /dev/null +++ b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramToDescribePostAggregator.java @@ -0,0 +1,108 @@ +package org.apache.druid.query.aggregation.sketch.HdrHistogram; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Sets; +import org.HdrHistogram.HistogramSketch; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.PostAggregator; +import org.apache.druid.query.cache.CacheKeyBuilder; +import org.apache.druid.segment.ColumnInspector; +import org.apache.druid.segment.column.ColumnType; + +import javax.annotation.Nullable; +import java.util.*; + +public class HdrHistogramToDescribePostAggregator implements PostAggregator { + private final String name; + private final String fieldName; + + @JsonCreator + public HdrHistogramToDescribePostAggregator( + @JsonProperty("name") String name, + @JsonProperty("fieldName") String fieldName + ){ + this.name = name; + this.fieldName = fieldName; + } + + @Override + public ColumnType getType(ColumnInspector signature){ + return ColumnType.STRING; + } + + @Override + @JsonProperty + public String getName() { + return name; + } + + @JsonProperty + public String getFieldName() { + return fieldName; + } + + @Nullable + @Override + public Object compute(Map values) { + HistogramSketch histogram = (HistogramSketch) values.get(fieldName); + if(histogram == null){ + return "{}"; //"[]" + } + return HdrHistogramModule.toJson(histogram.describe()); + } + + @Override + public Comparator getComparator() + { + throw new IAE("Comparing arrays of quantiles is not supported"); + } + + @Override + public Set getDependentFields() + { + return Sets.newHashSet(fieldName); + } + + @Override + public PostAggregator decorate(Map aggregators) { + return this; + } + + @Override + public byte[] getCacheKey() { + CacheKeyBuilder builder = new CacheKeyBuilder(HdrHistogramModule.CACHE_TYPE_ID_OFFSET).appendByte(HdrHistogramModule.QUANTILES_HDRHISTOGRAM_TO_DESCRIBE_CACHE_TYPE_ID) + .appendString(fieldName); + return builder.build(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + HdrHistogramToDescribePostAggregator that = (HdrHistogramToDescribePostAggregator) o; + + return name.equals(that.name) && + fieldName.equals(that.fieldName); + } + + @Override + public int hashCode() { + return Objects.hash(name, fieldName); + } + + @Override + public String toString() { + return "HdrHistogramToDescribePostAggregator{" + + "name='" + name + '\'' + + ", fieldName='" + fieldName + '\'' + + '}'; + } + + +} diff --git a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramToPercentilesDescribePostAggregator.java b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramToPercentilesDescribePostAggregator.java new file mode 100644 index 0000000..b2808b0 --- /dev/null +++ b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramToPercentilesDescribePostAggregator.java @@ -0,0 +1,125 @@ +package org.apache.druid.query.aggregation.sketch.HdrHistogram; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Sets; +import org.HdrHistogram.HistogramSketch; +import org.HdrHistogram.Percentile; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.PostAggregator; +import org.apache.druid.query.cache.CacheKeyBuilder; +import org.apache.druid.segment.ColumnInspector; +import org.apache.druid.segment.column.ColumnType; + +import javax.annotation.Nullable; +import java.util.*; + +public class HdrHistogramToPercentilesDescribePostAggregator implements PostAggregator { + private final String name; + private final String fieldName; + private final int percentileTicksPerHalfDistance; + + @JsonCreator + public HdrHistogramToPercentilesDescribePostAggregator( + @JsonProperty("name") String name, + @JsonProperty("fieldName") String fieldName, + @JsonProperty("percentileTicksPerHalfDistance") int percentileTicksPerHalfDistance + ){ + this.name = name; + this.fieldName = fieldName; + this.percentileTicksPerHalfDistance = percentileTicksPerHalfDistance; + } + + @Override + public ColumnType getType(ColumnInspector signature){ + return ColumnType.STRING; + } + + @Override + @JsonProperty + public String getName() { + return name; + } + + @JsonProperty + public String getFieldName() { + return fieldName; + } + + @JsonProperty + public int getPercentileTicksPerHalfDistance() { + return percentileTicksPerHalfDistance; + } + + @Nullable + @Override + public Object compute(Map values) { + HistogramSketch histogram = (HistogramSketch) values.get(fieldName); + if(histogram == null){ + return "{\"percentiles\":[],\"describe\":{}}"; + } + List percentiles = histogram.percentileList(percentileTicksPerHalfDistance); + Map describe = histogram.describe(); + Map rst = new LinkedHashMap<>(); + rst.put("percentiles", percentiles); + rst.put("description", describe); + return HdrHistogramModule.toJson(rst); + } + + @Override + public Comparator getComparator() + { + throw new IAE("Comparing object is not supported"); + } + + @Override + public Set getDependentFields() + { + return Sets.newHashSet(fieldName); + } + + @Override + public PostAggregator decorate(Map aggregators) { + return this; + } + + @Override + public byte[] getCacheKey() { + CacheKeyBuilder builder = new CacheKeyBuilder(HdrHistogramModule.CACHE_TYPE_ID_OFFSET).appendByte(HdrHistogramModule.QUANTILES_HDRHISTOGRAM_TO_PERCENTILES_DESCRIBE_CACHE_TYPE_ID) + .appendString(fieldName); + builder.appendInt(percentileTicksPerHalfDistance); + return builder.build(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + HdrHistogramToPercentilesDescribePostAggregator that = (HdrHistogramToPercentilesDescribePostAggregator) o; + + return percentileTicksPerHalfDistance == that.percentileTicksPerHalfDistance && + name.equals(that.name) && + fieldName.equals(that.fieldName); + } + + @Override + public int hashCode() { + return Objects.hash(name, fieldName, percentileTicksPerHalfDistance); + } + + @Override + public String toString() { + return "HdrHistogramToPercentilesDescribePostAggregator{" + + "name='" + name + '\'' + + ", fieldName='" + fieldName + '\'' + + ", probabilitys=" + percentileTicksPerHalfDistance + + '}'; + } + + +} diff --git a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/sql/HdrHistogramDescribeOperatorConversion.java b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/sql/HdrHistogramDescribeOperatorConversion.java new file mode 100644 index 0000000..4779d2b --- /dev/null +++ b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/sql/HdrHistogramDescribeOperatorConversion.java @@ -0,0 +1,77 @@ +package org.apache.druid.query.aggregation.sketch.HdrHistogram.sql; + +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlFunction; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.type.ReturnTypes; +import org.apache.calcite.sql.type.SqlTypeFamily; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.aggregation.PostAggregator; +import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; +import org.apache.druid.query.aggregation.sketch.HdrHistogram.HdrHistogramToDescribePostAggregator; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.sql.calcite.expression.DruidExpression; +import org.apache.druid.sql.calcite.expression.OperatorConversions; +import org.apache.druid.sql.calcite.expression.PostAggregatorVisitor; +import org.apache.druid.sql.calcite.expression.SqlOperatorConversion; +import org.apache.druid.sql.calcite.planner.PlannerContext; + +import javax.annotation.Nullable; +import java.util.List; + +public class HdrHistogramDescribeOperatorConversion implements SqlOperatorConversion { + private static final String FUNCTION_NAME = "HDR_DESCRIBE"; + private static final SqlFunction SQL_FUNCTION = OperatorConversions + .operatorBuilder(StringUtils.toUpperCase(FUNCTION_NAME)) + .operandTypes(SqlTypeFamily.ANY) + .requiredOperands(1) + .returnTypeInference(ReturnTypes.explicit(SqlTypeName.VARCHAR)) + .build(); + + @Override + public SqlOperator calciteOperator() + { + return SQL_FUNCTION; + } + + @Override + public DruidExpression toDruidExpression( + PlannerContext plannerContext, + RowSignature rowSignature, + RexNode rexNode + ) + { + return null; + } + + @Nullable + @Override + public PostAggregator toPostAggregator( + PlannerContext plannerContext, + RowSignature rowSignature, + RexNode rexNode, + PostAggregatorVisitor postAggregatorVisitor + ) + { + final List operands = ((RexCall) rexNode).getOperands(); + final PostAggregator postAgg = OperatorConversions.toPostAggregator( + plannerContext, + rowSignature, + operands.get(0), + postAggregatorVisitor, + true + ); + + if (postAgg == null) { + return null; + } + + + return new HdrHistogramToDescribePostAggregator( + postAggregatorVisitor.getOutputNamePrefix() + postAggregatorVisitor.getAndIncrementCounter(), + ((FieldAccessPostAggregator)postAgg).getFieldName() + ); + } +} diff --git a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/sql/HdrHistogramPercentilesDescribeOperatorConversion.java b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/sql/HdrHistogramPercentilesDescribeOperatorConversion.java new file mode 100644 index 0000000..a881b9b --- /dev/null +++ b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/sql/HdrHistogramPercentilesDescribeOperatorConversion.java @@ -0,0 +1,88 @@ +package org.apache.druid.query.aggregation.sketch.HdrHistogram.sql; + +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlFunction; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.type.ReturnTypes; +import org.apache.calcite.sql.type.SqlTypeFamily; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.aggregation.PostAggregator; +import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; +import org.apache.druid.query.aggregation.sketch.HdrHistogram.HdrHistogramToPercentilesDescribePostAggregator; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.sql.calcite.expression.DruidExpression; +import org.apache.druid.sql.calcite.expression.OperatorConversions; +import org.apache.druid.sql.calcite.expression.PostAggregatorVisitor; +import org.apache.druid.sql.calcite.expression.SqlOperatorConversion; +import org.apache.druid.sql.calcite.planner.PlannerContext; + +import javax.annotation.Nullable; +import java.util.List; + +public class HdrHistogramPercentilesDescribeOperatorConversion implements SqlOperatorConversion { + private static final String FUNCTION_NAME = "HDR_GET_PERCENTILES_DESCRIPTION"; + private static final SqlFunction SQL_FUNCTION = OperatorConversions + .operatorBuilder(StringUtils.toUpperCase(FUNCTION_NAME)) + .operandTypes(SqlTypeFamily.ANY, SqlTypeFamily.NUMERIC) + .requiredOperands(1) + .returnTypeInference(ReturnTypes.explicit(SqlTypeName.VARCHAR)) + .build(); + + @Override + public SqlOperator calciteOperator() + { + return SQL_FUNCTION; + } + + @Override + public DruidExpression toDruidExpression( + PlannerContext plannerContext, + RowSignature rowSignature, + RexNode rexNode + ) + { + return null; + } + + @Nullable + @Override + public PostAggregator toPostAggregator( + PlannerContext plannerContext, + RowSignature rowSignature, + RexNode rexNode, + PostAggregatorVisitor postAggregatorVisitor + ) + { + final List operands = ((RexCall) rexNode).getOperands(); + final PostAggregator postAgg = OperatorConversions.toPostAggregator( + plannerContext, + rowSignature, + operands.get(0), + postAggregatorVisitor, + true + ); + + if (postAgg == null) { + return null; + } + + int percentileTicksPerHalfDistance = 5; + if (operands.size() == 2) { + if (!operands.get(1).isA(SqlKind.LITERAL)) { + return null; + } + + percentileTicksPerHalfDistance = RexLiteral.intValue(operands.get(1)); + } + + return new HdrHistogramToPercentilesDescribePostAggregator( + postAggregatorVisitor.getOutputNamePrefix() + postAggregatorVisitor.getAndIncrementCounter(), + ((FieldAccessPostAggregator)postAgg).getFieldName(), + percentileTicksPerHalfDistance + ); + } +} diff --git a/druid-hdrhistogram/src/test/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HistogramSketchTest.java b/druid-hdrhistogram/src/test/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HistogramSketchTest.java new file mode 100644 index 0000000..cd82010 --- /dev/null +++ b/druid-hdrhistogram/src/test/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HistogramSketchTest.java @@ -0,0 +1,79 @@ +package org.apache.druid.query.aggregation.sketch.HdrHistogram; + +import org.HdrHistogram.DirectArrayHistogram; +import org.HdrHistogram.HistogramSketch; +import org.HdrHistogram.Histogramer; +import org.HdrHistogram.Percentile; +import org.apache.commons.lang3.StringUtils; +import org.junit.Test; + +import java.io.BufferedWriter; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; + +public class HistogramSketchTest { + + @Test + public void describeTest() throws Exception{ + DirectArrayHistogram histogram = new DirectArrayHistogram(1, 1000000, 3, + ByteBuffer.allocate(HistogramSketch.getUpdatableSerializationBytes(1, 1000000, 3))); + System.out.println(histogram.describe()); + for (int i = 0; i < 10000; i++) { + histogram.recordValue(i); + } + System.out.println(histogram.describe()); + for (Percentile percentile : histogram.percentileList(100)) { + System.out.println(percentile); + } + } + + @Test + public void describeTest1() throws Exception{ + HistogramSketch histogram = new HistogramSketch(1); + System.out.println(histogram.describe()); + for (int i = 0; i < 10000; i++) { + histogram.recordValue(i); + } + System.out.println(histogram.describe()); + for (Percentile percentile : histogram.percentileList(100)) { + System.out.println(percentile); + } + System.out.println(StringUtils.repeat('#', 100)); + histogram = new HistogramSketch(1); + for (int i = 0; i < 10000; i++) { + histogram.recordValue(ThreadLocalRandom.current().nextLong(100000)); + } + System.out.println(histogram.describe()); + for (Percentile percentile : histogram.percentileList(100)) { + System.out.println(percentile); + } + } + + @Test + public void describeTest3() throws Exception{ + HistogramSketch histogram = new HistogramSketch(3); + System.out.println(histogram.describe()); + for (int i = 0; i < 10000; i++) { + histogram.recordValue(i); + } + System.out.println(histogram.describe()); + for (Percentile percentile : histogram.percentileList(100)) { + System.out.println(percentile); + } + System.out.println(StringUtils.repeat('#', 100)); + histogram = new HistogramSketch(3); + for (int i = 0; i < 10000; i++) { + histogram.recordValue(ThreadLocalRandom.current().nextLong(100000)); + } + System.out.println(histogram.describe()); + for (Percentile percentile : histogram.percentileList(100)) { + System.out.println(percentile); + } + } + +} diff --git a/druid-hdrhistogram/src/test/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/sql/HdrHistogramQuantileSqlAggregatorTest.java b/druid-hdrhistogram/src/test/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/sql/HdrHistogramQuantileSqlAggregatorTest.java index 639b95f..6649598 100644 --- a/druid-hdrhistogram/src/test/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/sql/HdrHistogramQuantileSqlAggregatorTest.java +++ b/druid-hdrhistogram/src/test/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/sql/HdrHistogramQuantileSqlAggregatorTest.java @@ -219,6 +219,30 @@ public class HdrHistogramQuantileSqlAggregatorTest extends BaseCalciteQueryTest } } + @Test + public void testSqlDESCRIBE() throws Exception { + String sql = "select HDR_GET_QUANTILES(HDR_HISTOGRAM(m1, 1, 100, 2), 0, 0.25, 0.5, 0.75, 1) a, HDR_DESCRIBE(HDR_HISTOGRAM(m1, 1, 100, 2)) b, HDR_DESCRIBE(HDR_HISTOGRAM(hist_m1, 1, 100, 2)) c from druid.foo"; + QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize(); + builder.run(); + QueryTestRunner.QueryResults queryResults = builder.results(); + List results = queryResults.results; + for (Object[] result : results) { + System.out.println(Arrays.toString(result)); + } + } + + @Test + public void testSqlDESCRIBE2() throws Exception { + String sql = "select HDR_GET_QUANTILES(HDR_HISTOGRAM(m1, 1, 100, 2), 0, 0.25, 0.5, 0.75, 1) a, HDR_GET_PERCENTILES_DESCRIPTION(HDR_HISTOGRAM(m1, 1, 100, 2)) b, HDR_GET_PERCENTILES_DESCRIPTION(HDR_HISTOGRAM(hist_m1, 1, 100, 2)) c from druid.foo"; + QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize(); + builder.run(); + QueryTestRunner.QueryResults queryResults = builder.results(); + List results = queryResults.results; + for (Object[] result : results) { + System.out.println(Arrays.toString(result)); + } + } + @Test public void testSqlQuery() throws Exception { String[] columns = new String[]{"__time", "dim1", "dim2", "dim3", "cnt", "hist_m1", "m1"}; diff --git a/druid-udf/pom.xml b/druid-udf/pom.xml new file mode 100644 index 0000000..07b2f76 --- /dev/null +++ b/druid-udf/pom.xml @@ -0,0 +1,143 @@ + + + 4.0.0 + + org.example + druid-udf_26.0.0 + druid-udf + 1.0-SNAPSHOT + + + 11 + 11 + UTF-8 + 26.0.0 + + + + + org.apache.druid + druid-server + ${druid.version} + provided + + + + org.apache.druid + druid-sql + ${druid.version} + provided + + + + + + org.easymock + easymock + 4.3 + test + + + + org.apache.druid + druid-processing + ${druid.version} + test-jar + test + + + org.apache.druid + druid-server + ${druid.version} + test + test-jar + + + org.apache.druid + druid-sql + ${druid.version} + test-jar + test + + + junit + junit + 4.12 + test + + + com.alibaba.fastjson2 + fastjson2 + 2.0.34 + test + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + -Xlint:unchecked + 11 + 11 + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.19.1 + + -Duser.timezone=UTC + true + + + + org.apache.maven.plugins + maven-assembly-plugin + 2.5.5 + + + distro-assembly + package + + single + + + ${project.artifactId}-${project.version} + posix + + src/assembly/assembly.xml + + + + + + + maven-release-plugin + 2.5.3 + + + org.apache.maven.scm + maven-scm-provider-gitexe + 1.9.4 + + + + + org.apache.maven.plugins + maven-jar-plugin + 3.0.2 + + + false + + + + + + \ No newline at end of file diff --git a/druid-udf/src/assembly/assembly.xml b/druid-udf/src/assembly/assembly.xml new file mode 100644 index 0000000..8fb4519 --- /dev/null +++ b/druid-udf/src/assembly/assembly.xml @@ -0,0 +1,54 @@ + + + + + bin + + tar.gz + + + ${project.name} + + + + false + true + . + false + + + + + + . + + + README.md + LICENSE + + + + ${project.build.directory} + . + + *.jar + + + + \ No newline at end of file diff --git a/druid-udf/src/main/java/org/apache/druid/query/udf/UdfModule.java b/druid-udf/src/main/java/org/apache/druid/query/udf/UdfModule.java new file mode 100644 index 0000000..d09e02a --- /dev/null +++ b/druid-udf/src/main/java/org/apache/druid/query/udf/UdfModule.java @@ -0,0 +1,23 @@ +package org.apache.druid.query.udf; + +import com.google.inject.Binder; +import org.apache.druid.guice.ExpressionModule; +import org.apache.druid.initialization.DruidModule; +import org.apache.druid.query.udf.expressions.DimensionBucketExprMacro; +import org.apache.druid.query.udf.sql.DimensionBucketOperatorConversion; +import org.apache.druid.sql.guice.SqlBindings; + +public class UdfModule implements DruidModule { + @Override + public void configure(Binder binder) { + SqlBindings.addOperatorConversion(binder, DimensionBucketOperatorConversion.class); + ExpressionModule.addExprMacro(binder, DimensionBucketExprMacro.class); + } + + /*@Override + public List getJacksonModules() { + // Register Jackson module for any classes we need to be able to use in JSON queries or ingestion specs. + return Collections.singletonList(new SimpleModule("UdfModule")); + }*/ + +} diff --git a/druid-udf/src/main/java/org/apache/druid/query/udf/expressions/DimensionBucketExprMacro.java b/druid-udf/src/main/java/org/apache/druid/query/udf/expressions/DimensionBucketExprMacro.java new file mode 100644 index 0000000..da8c6b6 --- /dev/null +++ b/druid-udf/src/main/java/org/apache/druid/query/udf/expressions/DimensionBucketExprMacro.java @@ -0,0 +1,82 @@ +package org.apache.druid.query.udf.expressions; + +import org.apache.druid.math.expr.*; +import org.apache.druid.math.expr.ExprMacroTable.ExprMacro; + +import javax.annotation.Nullable; +import java.util.List; +import java.util.stream.Collectors; + +public class DimensionBucketExprMacro implements ExprMacro { + private static final String NAME = "dimension_bucket"; + + @Override + public String name() { + return NAME; + } + + @Override + public Expr apply(List args) { + validationHelperCheckMinArgumentCount(args, 2); + Expr bucketCnt = args.get(0); + if(!bucketCnt.isLiteral()|| bucketCnt.eval(InputBindings.nilBindings()).asInt() <= 0) { + throw validationFailed("first bucketCount argument must is int literal and > 0"); + } + return new DimensionBucketExpr(args); + } + + static class DimensionBucketExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr { + private final int bucketCount; + + public DimensionBucketExpr(List args) { + super(NAME, args); + bucketCount = args.get(0).eval(InputBindings.nilBindings()).asInt(); + } + + @Override + public ExprEval eval(ObjectBinding bindings) { + int result = 1; + for (int i = 1; i < args.size(); i++) { + ExprEval eval = args.get(i).eval(bindings); + Object element = eval.value(); + if(element instanceof Object[]){ + for (Object ele : (Object[]) element) { + result = 31 * result + (ele == null ? 0 : ele.hashCode()); + } + }else{ + result = 31 * result + (element == null ? 0 : element.hashCode()); + } + + /*else if (element instanceof Number) { + //result = 31 * result + Integer.hashCode(((Number)element).intValue()); + result = 31 * result + Long.hashCode(((Number)element).longValue()); + }*/ + } + + int bucket = Math.abs(result) % bucketCount; + return ExprEval.of(IntToHexUtil.uInt16ToHexStringFast(bucket)); + } + + @Override + public Expr visit(Shuttle shuttle) { + List newArgs = args.stream().map(x -> x.visit(shuttle)).collect(Collectors.toList()); + return shuttle.visit(new DimensionBucketExpr(newArgs)); + } + + @Override + public BindingAnalysis analyzeInputs() { + return super.analyzeInputs(); + } + + @Nullable + @Override + public ExpressionType getOutputType(InputBindingInspector inspector) { + return ExpressionType.STRING; + } + + @Override + public boolean canVectorize(InputBindingInspector inspector) { + return false; + } + } +} diff --git a/druid-udf/src/main/java/org/apache/druid/query/udf/expressions/IntToHexUtil.java b/druid-udf/src/main/java/org/apache/druid/query/udf/expressions/IntToHexUtil.java new file mode 100644 index 0000000..cb133e5 --- /dev/null +++ b/druid-udf/src/main/java/org/apache/druid/query/udf/expressions/IntToHexUtil.java @@ -0,0 +1,45 @@ +package org.apache.druid.query.udf.expressions; + +import java.nio.charset.StandardCharsets; + +public class IntToHexUtil { + static final byte[] digits = { + '0' , '1' , '2' , '3' , '4' , '5' , + '6' , '7' , '8' , '9' , 'a' , 'b' , + 'c' , 'd' , 'e' , 'f' , 'g' , 'h' , + 'i' , 'j' , 'k' , 'l' , 'm' , 'n' , + 'o' , 'p' , 'q' , 'r' , 's' , 't' , + 'u' , 'v' , 'w' , 'x' , 'y' , 'z' + }; + static final String[] uInt16HexsCache; + static final int uInt16HexsCacheSize = 8192; + + static{ + uInt16HexsCache = new String[uInt16HexsCacheSize]; + for (int i = 0; i < uInt16HexsCacheSize; i++) { + uInt16HexsCache[i] = uInt16ToHexString(i); + } + } + + public static String uInt16ToHexStringFast(int i){ + if(i < uInt16HexsCacheSize){ + return uInt16HexsCache[i]; + }else{ + return uInt16ToHexString(i); + } + } + + private static String uInt16ToHexString(int i){ + byte[] bytes = new byte[4]; + int mask = 15; // 16 - 1 + int value = i; + bytes[3] = digits[value & mask]; + value >>>= 4; + bytes[2] = digits[value & mask]; + value >>>= 4; + bytes[1] = digits[value & mask]; + value >>>= 4; + bytes[0] = digits[value & mask]; + return new String(bytes, StandardCharsets.US_ASCII); + } +} diff --git a/druid-udf/src/main/java/org/apache/druid/query/udf/sql/DimensionBucketOperatorConversion.java b/druid-udf/src/main/java/org/apache/druid/query/udf/sql/DimensionBucketOperatorConversion.java new file mode 100644 index 0000000..2aff9cb --- /dev/null +++ b/druid-udf/src/main/java/org/apache/druid/query/udf/sql/DimensionBucketOperatorConversion.java @@ -0,0 +1,43 @@ +package org.apache.druid.query.udf.sql; + +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.type.OperandTypes; +import org.apache.calcite.sql.type.ReturnTypes; +import org.apache.calcite.sql.type.SqlOperandCountRanges; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.sql.calcite.expression.DruidExpression; +import org.apache.druid.sql.calcite.expression.OperatorConversions; +import org.apache.druid.sql.calcite.expression.SqlOperatorConversion; +import org.apache.druid.sql.calcite.planner.Calcites; +import org.apache.druid.sql.calcite.planner.PlannerContext; + +import javax.annotation.Nullable; + +public class DimensionBucketOperatorConversion implements SqlOperatorConversion { + private static final SqlFunction SQL_FUNCTION = new SqlFunction( + "DIMENSION_BUCKET", + SqlKind.OTHER_FUNCTION, + ReturnTypes.explicit( + factory -> Calcites.createSqlTypeWithNullability(factory, SqlTypeName.VARCHAR, true) + ), + null, + OperandTypes.variadic(SqlOperandCountRanges.from(2)), + SqlFunctionCategory.USER_DEFINED_FUNCTION + ); + + @Override + public SqlOperator calciteOperator() { + return SQL_FUNCTION; + } + + @Nullable + @Override + public DruidExpression toDruidExpression(PlannerContext plannerContext, RowSignature rowSignature, RexNode rexNode) { + return OperatorConversions.convertDirectCall(plannerContext, rowSignature, rexNode, "dimension_bucket"); + } +} diff --git a/druid-udf/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule b/druid-udf/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule new file mode 100644 index 0000000..ab6de6b --- /dev/null +++ b/druid-udf/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule @@ -0,0 +1 @@ +org.apache.druid.query.udf.UdfModule \ No newline at end of file diff --git a/druid-udf/src/test/java/org/apache/druid/query/udf/expressions/DimensionBucketExprTest.java b/druid-udf/src/test/java/org/apache/druid/query/udf/expressions/DimensionBucketExprTest.java new file mode 100644 index 0000000..51d2ce0 --- /dev/null +++ b/druid-udf/src/test/java/org/apache/druid/query/udf/expressions/DimensionBucketExprTest.java @@ -0,0 +1,146 @@ +package org.apache.druid.query.udf.expressions; + +import com.google.common.collect.ImmutableMap; +import org.apache.druid.math.expr.*; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.junit.Test; + +import java.util.Collections; + +public class DimensionBucketExprTest extends InitializedNullHandlingTest { + private final ExprMacroTable exprMacroTable = new ExprMacroTable(Collections.singletonList(new DimensionBucketExprMacro())); + Expr.ObjectBinding inputBindings = InputBindings.forInputSuppliers( + new ImmutableMap.Builder() + .put("string", InputBindings.inputSupplier(ExpressionType.STRING, () -> "abcdef")) + .put("long", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1234L)) + .put("double", InputBindings.inputSupplier(ExpressionType.DOUBLE, () -> 1.234)) + .put("array1", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> new Object[]{"1", "2", "3"})) + .put("array2", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> new String[]{"1", "2", "3"})) + .put("nullString", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("nullLong", InputBindings.inputSupplier(ExpressionType.LONG, () -> null)) + .put("nullDouble", InputBindings.inputSupplier(ExpressionType.DOUBLE, () -> null)) + .build() + ); + + Expr.ObjectBinding[] inputBindingArray = new Expr.ObjectBinding[]{ + InputBindings.forInputSuppliers( + new ImmutableMap.Builder() + .put("device_id", InputBindings.inputSupplier(ExpressionType.STRING, () -> "1")) + .put("rule_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 81)) + .put("template_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 81)) + .put("chart_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 81)) + .put("version", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1)) + .put("client_ip_object", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null)) + .put("server_ip_object", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null)) + .put("fqdn_category", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null)) + .put("client_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_fqdn", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_domain", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("application", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .build() + ), + InputBindings.forInputSuppliers( + new ImmutableMap.Builder() + .put("device_id", InputBindings.inputSupplier(ExpressionType.STRING, () -> "1")) + .put("rule_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 101)) + .put("template_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 101)) + .put("chart_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 101)) + .put("version", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1)) + .put("client_ip_object", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> new Object[]{"5","7","8"})) + .put("server_ip_object", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null)) + .put("fqdn_category", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null)) + .put("client_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_fqdn", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_domain", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("application", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .build() + ), + InputBindings.forInputSuppliers( + new ImmutableMap.Builder() + .put("device_id", InputBindings.inputSupplier(ExpressionType.STRING, () -> "1")) + .put("rule_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 271L)) + .put("template_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 271L)) + .put("chart_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 271L)) + .put("version", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1L)) + .put("client_ip_object", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null)) + .put("server_ip_object", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null)) + .put("fqdn_category", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null)) + .put("client_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> "5.245.228.51")) + .put("server_fqdn", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_domain", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("application", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .build() + ), + // ... + InputBindings.forInputSuppliers( + new ImmutableMap.Builder() + .put("device_id", InputBindings.inputSupplier(ExpressionType.STRING, () -> "1")) + .put("rule_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 81)) + .put("template_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 81)) + .put("chart_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 81)) + .put("version", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1)) + .put("client_ip_object", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_ip_object", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("fqdn_category", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("client_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_fqdn", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_domain", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("application", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .build() + ), + InputBindings.forInputSuppliers( + new ImmutableMap.Builder() + .put("device_id", InputBindings.inputSupplier(ExpressionType.STRING, () -> "1")) + .put("rule_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 101)) + .put("template_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 101)) + .put("chart_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 101)) + .put("version", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1)) + .put("client_ip_object", InputBindings.inputSupplier(ExpressionType.STRING, () -> "5,7,8")) + .put("server_ip_object", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("fqdn_category", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("client_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_fqdn", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_domain", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("application", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .build() + ), + InputBindings.forInputSuppliers( + new ImmutableMap.Builder() + .put("device_id", InputBindings.inputSupplier(ExpressionType.STRING, () -> "1")) + .put("rule_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 271L)) + .put("template_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 271L)) + .put("chart_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 271L)) + .put("version", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1L)) + .put("client_ip_object", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_ip_object", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("fqdn_category", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("client_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> "5.245.228.51")) + .put("server_fqdn", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_domain", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("application", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .build() + ), + }; + + @Test + public void test() { + Expr expr = Parser.parse("dimension_bucket(1024, 100, 'aaa', string,long,double,array1, array2, nullString, nullLong)", exprMacroTable); + ExprEval eval = expr.eval(inputBindings); + System.out.println(eval.value()); + } + + @Test + public void test2() { + for (Expr.ObjectBinding objectBinding : inputBindingArray) { + Expr expr = Parser.parse("dimension_bucket(1024, device_id, rule_id, template_id, chart_id, version, client_ip_object, server_ip_object, fqdn_category, client_ip, server_ip, server_fqdn, server_domain, application)", exprMacroTable); + ExprEval eval = expr.eval(objectBinding); + System.out.println(objectBinding.get("rule_id") + ", bucket_id:" + eval.value()); + } + } +}