diff --git a/druid-hdrhistogram/pom.xml b/druid-hdrhistogram/pom.xml
index adc85a2..892bc1a 100644
--- a/druid-hdrhistogram/pom.xml
+++ b/druid-hdrhistogram/pom.xml
@@ -5,7 +5,7 @@
4.0.0
org.apache.druid.extensions
- druid-hdrhistogram_0.18.1
+ druid-hdrhistogram_26.0.0
druid-hdrhistogram
1.0-SNAPSHOT
@@ -14,7 +14,7 @@
UTF-8
1.8
1.8
- 0.18.1
+ 26.0.0
@@ -45,6 +45,13 @@
+
+ org.easymock
+ easymock
+ 4.3
+ test
+
+
org.apache.druid
druid-processing
@@ -54,9 +61,17 @@
org.apache.druid
- druid-benchmarks
+ druid-server
${druid.version}
test
+ test-jar
+
+
+ org.apache.druid
+ druid-sql
+ ${druid.version}
+ test-jar
+ test
junit
diff --git a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramAggregatorFactory.java b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramAggregatorFactory.java
index 8596fc3..fd365b8 100644
--- a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramAggregatorFactory.java
+++ b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramAggregatorFactory.java
@@ -1,321 +1,348 @@
-package org.apache.druid.query.aggregation.sketch.HdrHistogram;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.HdrHistogram.DirectHistogram;
-import org.HdrHistogram.Histogram;
-import org.HdrHistogram.HistogramSketch;
-import org.HdrHistogram.HistogramUnion;
-import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.query.aggregation.*;
-import org.apache.druid.query.cache.CacheKeyBuilder;
-import org.apache.druid.segment.ColumnSelectorFactory;
-import org.apache.druid.segment.ColumnValueSelector;
-
-import javax.annotation.Nullable;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Objects;
-
-public class HdrHistogramAggregatorFactory extends AggregatorFactory {
- public static final long DEFAULT_LOWEST = 1;
- public static final long DEFAULT_HIGHEST = 2;
- public static final int DEFAULT_SIGNIFICANT = 3;
- public static final boolean DEFAULT_AUTO_RESIZE = true;
- public static final long BUFFER_AUTO_RESIZE_HIGHEST = 100000000L * 1000000L;
- public static final Comparator COMPARATOR =
- Comparator.nullsFirst(Comparator.comparingLong(HistogramSketch::getTotalCount));
-
- protected final String name;
- protected final String fieldName;
- protected final long lowestDiscernibleValue;
- protected final long highestTrackableValue;
- protected final int numberOfSignificantValueDigits;
- protected final boolean autoResize; //默认是false
-
- public HdrHistogramAggregatorFactory(
- @JsonProperty("name") String name,
- @JsonProperty("fieldName") String fieldName,
- @JsonProperty("lowestDiscernibleValue") @Nullable Long lowestDiscernibleValue,
- @JsonProperty("highestTrackableValue") @Nullable Long highestTrackableValue,
- @JsonProperty("numberOfSignificantValueDigits") @Nullable Integer numberOfSignificantValueDigits,
- @JsonProperty("autoResize") @Nullable Boolean autoResize
- ) {
- if (name == null) {
- throw new IAE("Must have a valid, non-null aggregator name");
- }
- if (fieldName == null) {
- throw new IAE("Parameter fieldName must be specified");
- }
-
- if(lowestDiscernibleValue == null){
- lowestDiscernibleValue = DEFAULT_LOWEST;
- }
- // Verify argument validity
- if (lowestDiscernibleValue < 1) {
- throw new IAE("lowestDiscernibleValue must be >= 1");
- }
- if (lowestDiscernibleValue > Long.MAX_VALUE / 2) {
- // prevent subsequent multiplication by 2 for highestTrackableValue check from overflowing
- throw new IAE("lowestDiscernibleValue must be <= Long.MAX_VALUE / 2");
- }
- if(highestTrackableValue == null){
- highestTrackableValue = DEFAULT_HIGHEST;
- }
- if (highestTrackableValue < 2L * lowestDiscernibleValue) {
- throw new IAE("highestTrackableValue must be >= 2 * lowestDiscernibleValue");
- }
- if(numberOfSignificantValueDigits == null){
- numberOfSignificantValueDigits = DEFAULT_SIGNIFICANT;
- }
- if ((numberOfSignificantValueDigits < 0) || (numberOfSignificantValueDigits > 5)) {
- throw new IAE("numberOfSignificantValueDigits must be between 0 and 5");
- }
- if(autoResize == null){
- autoResize = DEFAULT_AUTO_RESIZE;
- }
-
- this.name = name;
- this.fieldName = fieldName;
- this.lowestDiscernibleValue = lowestDiscernibleValue;
- this.highestTrackableValue = highestTrackableValue;
- this.numberOfSignificantValueDigits = numberOfSignificantValueDigits;
- this.autoResize = autoResize;
- }
-
- @Override
- public Aggregator factorize(ColumnSelectorFactory metricFactory) {
- return new HdrHistogramAggregator(
- metricFactory.makeColumnValueSelector(fieldName),
- lowestDiscernibleValue,
- highestTrackableValue,
- numberOfSignificantValueDigits,
- autoResize
- );
- }
-
- @Override
- public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) {
- return new HdrHistogramBufferAggregator(
- metricFactory.makeColumnValueSelector(fieldName),
- lowestDiscernibleValue,
- highestTrackableValue,
- numberOfSignificantValueDigits,
- autoResize,
- getMaxIntermediateSize()
- );
- }
-
- @Override
- public Comparator getComparator() {
- return COMPARATOR;
- }
-
- @Override
- public Object combine(Object lhs, Object rhs) {
- if(lhs == null){
- return rhs;
- }else if(rhs == null){
- return lhs;
- }else{
- final HistogramUnion union = new HistogramUnion(lowestDiscernibleValue,highestTrackableValue,numberOfSignificantValueDigits,autoResize);
- union.update((HistogramSketch) lhs);
- union.update((HistogramSketch) rhs);
- HistogramSketch result = union.getResult();
- return result;
- }
- }
-
- @Override
- public AggregateCombiner makeAggregateCombiner() {
- return new ObjectAggregateCombiner() {
- private HistogramUnion union = null;
-
- @Override
- public void reset(ColumnValueSelector selector) {
- //union.reset();
- union = null;
- fold(selector);
- }
-
- @Override
- public void fold(ColumnValueSelector selector) {
- HistogramSketch h = (HistogramSketch) selector.getObject();
- if(h != null){
- if(union == null){
- union = new HistogramUnion(lowestDiscernibleValue,highestTrackableValue,numberOfSignificantValueDigits,autoResize);
- }
- union.update(h);
- }
- }
-
- @Override
- public Class classOfObject() {
- return HistogramSketch.class;
- }
-
- @Nullable
- @Override
- public HistogramSketch getObject() {
- if(union == null){
- return null;
- }else{
- HistogramSketch result = union.getResult();
- /*if(result.getTotalCount() == 0){
- return null;
- }*/
- return result;
- }
- }
- };
- }
-
- /*public Histogram geneHistogram() {
- Histogram histogram = new Histogram(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits);
- histogram.setAutoResize(autoResize);
- return histogram;
- }*/
-
- @Override
- public AggregatorFactory getCombiningFactory() {
- return new HdrHistogramMergeAggregatorFactory(name, name, lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits, autoResize);
- }
-
- @Override
- public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException {
- if (other.getName().equals(this.getName()) && other instanceof HdrHistogramAggregatorFactory) {
- HdrHistogramAggregatorFactory castedOther = (HdrHistogramAggregatorFactory) other;
-
- return new HdrHistogramMergeAggregatorFactory(name, name,
- Math.min(lowestDiscernibleValue, castedOther.lowestDiscernibleValue),
- Math.max(highestTrackableValue, castedOther.highestTrackableValue),
- Math.max(numberOfSignificantValueDigits, castedOther.numberOfSignificantValueDigits),
- autoResize || castedOther.autoResize
- );
- } else {
- throw new AggregatorFactoryNotMergeableException(this, other);
- }
- }
-
- @Override
- public List getRequiredColumns() {
- return Collections.singletonList(
- new HdrHistogramAggregatorFactory(
- fieldName,
- fieldName,
- lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits, autoResize
- )
- );
- }
-
- @Override
- public Object deserialize(Object object) {
- return HistogramUtils.deserializeHistogram(object);
- }
-
- @Nullable
- @Override
- public Object finalizeComputation(@Nullable Object object) {
- return object == null ? null : ((HistogramSketch) object).getTotalCount();
- }
-
- @Override
- @JsonProperty
- public String getName() {
- return name;
- }
-
- @JsonProperty
- public String getFieldName() {
- return fieldName;
- }
-
- @JsonProperty
- public long getLowestDiscernibleValue() {
- return lowestDiscernibleValue;
- }
-
- @JsonProperty
- public long getHighestTrackableValue() {
- return highestTrackableValue;
- }
-
- @JsonProperty
- public int getNumberOfSignificantValueDigits() {
- return numberOfSignificantValueDigits;
- }
-
- @JsonProperty
- public boolean isAutoResize() {
- return autoResize;
- }
-
- @Override
- public String getTypeName() {
- return HdrHistogramModule.HDRHISTOGRAM_TYPE_NAME;
- }
-
- @Override
- public List requiredFields() {
- return Collections.singletonList(fieldName);
- }
-
-
- @Override
- public int getMaxIntermediateSize() {
- if(!autoResize){
- /*Histogram histogram = new Histogram(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits);
- histogram.setAutoResize(autoResize);
- return histogram.getNeededByteBufferCapacity();*/
- return HistogramSketch.getUpdatableSerializationBytes(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits);
- }else{
- //return (1 << 10) * 512;
- return HistogramSketch.getUpdatableSerializationBytes(lowestDiscernibleValue, BUFFER_AUTO_RESIZE_HIGHEST, numberOfSignificantValueDigits);
- }
- }
-
- @Override
- public byte[] getCacheKey() {
- return new CacheKeyBuilder(HdrHistogramModule.CACHE_TYPE_ID_OFFSET).appendByte(HdrHistogramModule.QUANTILES_HDRHISTOGRAM_BUILD_CACHE_TYPE_ID)
- .appendString(name).appendString(fieldName)
- .appendDouble(lowestDiscernibleValue).appendDouble(highestTrackableValue)
- .appendInt(numberOfSignificantValueDigits).appendBoolean(autoResize)
- .build();
- }
-
- @Override
- public boolean equals(final Object o){
- if (this == o) {
- return true;
- }
- if (o == null || !getClass().equals(o.getClass())) {
- return false;
- }
-
- HdrHistogramAggregatorFactory that = (HdrHistogramAggregatorFactory) o;
- return name.equals(that.name) && fieldName.equals(that.fieldName) &&
- lowestDiscernibleValue == that.lowestDiscernibleValue &&
- highestTrackableValue == that.highestTrackableValue &&
- numberOfSignificantValueDigits == that.numberOfSignificantValueDigits &&
- autoResize == that.autoResize
- ;
- }
-
- @Override
- public int hashCode(){
- return Objects.hash(name, fieldName, lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits, autoResize);
- }
-
-
- @Override
- public String toString() {
- return getClass().getSimpleName() + "{" +
- "name='" + name + '\'' +
- ", fieldName='" + fieldName + '\'' +
- ", lowestDiscernibleValue=" + lowestDiscernibleValue +
- ", highestTrackableValue=" + highestTrackableValue +
- ", numberOfSignificantValueDigits=" + numberOfSignificantValueDigits +
- ", autoResize=" + autoResize +
- '}';
- }
-}
+package org.apache.druid.query.aggregation.sketch.HdrHistogram;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.HdrHistogram.HistogramSketch;
+import org.HdrHistogram.HistogramUnion;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.query.aggregation.*;
+import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.column.ColumnType;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+public class HdrHistogramAggregatorFactory extends AggregatorFactory {
+ public static final long DEFAULT_LOWEST = 1;
+ public static final long DEFAULT_HIGHEST = 2;
+ public static final int DEFAULT_SIGNIFICANT = 1;
+ public static final boolean DEFAULT_AUTO_RESIZE = true;
+ public static final long BUFFER_AUTO_RESIZE_HIGHEST = 100000000L * 1000000L;
+ public static final Comparator COMPARATOR =
+ Comparator.nullsFirst(Comparator.comparingLong(HistogramSketch::getTotalCount));
+
+ protected final String name;
+ protected final String fieldName;
+ protected final long lowestDiscernibleValue;
+ protected final long highestTrackableValue;
+ protected final int numberOfSignificantValueDigits;
+ protected final boolean autoResize; //默认是false
+ protected final int updatableSerializationBytes;
+
+ public HdrHistogramAggregatorFactory(
+ @JsonProperty("name") String name,
+ @JsonProperty("fieldName") String fieldName,
+ @JsonProperty("lowestDiscernibleValue") @Nullable Long lowestDiscernibleValue,
+ @JsonProperty("highestTrackableValue") @Nullable Long highestTrackableValue,
+ @JsonProperty("numberOfSignificantValueDigits") @Nullable Integer numberOfSignificantValueDigits,
+ @JsonProperty("autoResize") @Nullable Boolean autoResize
+ ) {
+ if (name == null) {
+ throw new IAE("Must have a valid, non-null aggregator name");
+ }
+ if (fieldName == null) {
+ throw new IAE("Parameter fieldName must be specified");
+ }
+
+ if(lowestDiscernibleValue == null){
+ lowestDiscernibleValue = DEFAULT_LOWEST;
+ }
+ // Verify argument validity
+ if (lowestDiscernibleValue < 1) {
+ throw new IAE("lowestDiscernibleValue must be >= 1");
+ }
+ if (lowestDiscernibleValue > Long.MAX_VALUE / 2) {
+ // prevent subsequent multiplication by 2 for highestTrackableValue check from overflowing
+ throw new IAE("lowestDiscernibleValue must be <= Long.MAX_VALUE / 2");
+ }
+ if(highestTrackableValue == null){
+ highestTrackableValue = DEFAULT_HIGHEST;
+ }
+ if (highestTrackableValue < 2L * lowestDiscernibleValue) {
+ throw new IAE("highestTrackableValue must be >= 2 * lowestDiscernibleValue");
+ }
+ if(numberOfSignificantValueDigits == null){
+ numberOfSignificantValueDigits = DEFAULT_SIGNIFICANT;
+ }
+ if ((numberOfSignificantValueDigits < 0) || (numberOfSignificantValueDigits > 5)) {
+ throw new IAE("numberOfSignificantValueDigits must be between 0 and 5");
+ }
+ if(autoResize == null){
+ autoResize = DEFAULT_AUTO_RESIZE;
+ }
+
+ this.name = name;
+ this.fieldName = fieldName;
+ this.lowestDiscernibleValue = lowestDiscernibleValue;
+ this.highestTrackableValue = highestTrackableValue;
+ this.numberOfSignificantValueDigits = numberOfSignificantValueDigits;
+ this.autoResize = autoResize;
+ this.updatableSerializationBytes = getUpdatableSerializationBytes();
+ }
+
+ @Override
+ public Aggregator factorize(ColumnSelectorFactory metricFactory) {
+ return new HdrHistogramAggregator(
+ metricFactory.makeColumnValueSelector(fieldName),
+ lowestDiscernibleValue,
+ highestTrackableValue,
+ numberOfSignificantValueDigits,
+ autoResize
+ );
+ }
+
+ @Override
+ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) {
+ return new HdrHistogramBufferAggregator(
+ metricFactory.makeColumnValueSelector(fieldName),
+ lowestDiscernibleValue,
+ highestTrackableValue,
+ numberOfSignificantValueDigits,
+ autoResize,
+ getMaxIntermediateSize()
+ );
+ }
+
+ @Override
+ public Comparator getComparator() {
+ return COMPARATOR;
+ }
+
+ @Override
+ public Object combine(Object lhs, Object rhs) {
+ if(lhs == null){
+ return rhs;
+ }else if(rhs == null){
+ return lhs;
+ }else{
+ final HistogramUnion union = new HistogramUnion(lowestDiscernibleValue,highestTrackableValue,numberOfSignificantValueDigits,autoResize);
+ union.update((HistogramSketch) lhs);
+ union.update((HistogramSketch) rhs);
+ HistogramSketch result = union.getResult();
+ return result;
+ }
+ }
+
+ @Override
+ public AggregateCombiner makeAggregateCombiner() {
+ return new ObjectAggregateCombiner() {
+ private HistogramUnion union = null;
+
+ @Override
+ public void reset(ColumnValueSelector selector) {
+ //union.reset();
+ union = null;
+ fold(selector);
+ }
+
+ @Override
+ public void fold(ColumnValueSelector selector) {
+ HistogramSketch h = (HistogramSketch) selector.getObject();
+ if(h != null){
+ if(union == null){
+ union = new HistogramUnion(lowestDiscernibleValue,highestTrackableValue,numberOfSignificantValueDigits,autoResize);
+ }
+ union.update(h);
+ }
+ }
+
+ @Override
+ public Class classOfObject() {
+ return HistogramSketch.class;
+ }
+
+ @Nullable
+ @Override
+ public HistogramSketch getObject() {
+ if(union == null){
+ return null;
+ }else{
+ HistogramSketch result = union.getResult();
+ /*if(result.getTotalCount() == 0){
+ return null;
+ }*/
+ return result;
+ }
+ }
+ };
+ }
+
+ /*public Histogram geneHistogram() {
+ Histogram histogram = new Histogram(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits);
+ histogram.setAutoResize(autoResize);
+ return histogram;
+ }*/
+
+ @Override
+ public AggregatorFactory getCombiningFactory() {
+ return new HdrHistogramMergeAggregatorFactory(name, name, lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits, autoResize);
+ }
+
+ @Override
+ public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException {
+ if (other.getName().equals(this.getName()) && other instanceof HdrHistogramAggregatorFactory) {
+ HdrHistogramAggregatorFactory castedOther = (HdrHistogramAggregatorFactory) other;
+
+ return new HdrHistogramMergeAggregatorFactory(name, name,
+ Math.min(lowestDiscernibleValue, castedOther.lowestDiscernibleValue),
+ Math.max(highestTrackableValue, castedOther.highestTrackableValue),
+ Math.max(numberOfSignificantValueDigits, castedOther.numberOfSignificantValueDigits),
+ autoResize || castedOther.autoResize
+ );
+ } else {
+ throw new AggregatorFactoryNotMergeableException(this, other);
+ }
+ }
+
+ @Override
+ public List getRequiredColumns() {
+ return Collections.singletonList(
+ new HdrHistogramAggregatorFactory(
+ fieldName,
+ fieldName,
+ lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits, autoResize
+ )
+ );
+ }
+
+ @Override
+ public AggregatorFactory withName(String newName) {
+ return new HdrHistogramAggregatorFactory(newName, fieldName, lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits, autoResize);
+ }
+
+ @Override
+ public Object deserialize(Object object) {
+ if (object == null) {
+ return null;
+ }
+ return HistogramUtils.deserializeHistogram(object);
+ }
+
+ @Override
+ public ColumnType getResultType() {
+ //return ColumnType.LONG;
+ return getIntermediateType();
+ }
+
+ @Nullable
+ @Override
+ public Object finalizeComputation(@Nullable Object object) {
+ //return object == null ? null : ((HistogramSketch) object).getTotalCount();
+ return object;
+ }
+
+ @Override
+ @JsonProperty
+ public String getName() {
+ return name;
+ }
+
+ @JsonProperty
+ public String getFieldName() {
+ return fieldName;
+ }
+
+ @JsonProperty
+ public long getLowestDiscernibleValue() {
+ return lowestDiscernibleValue;
+ }
+
+ @JsonProperty
+ public long getHighestTrackableValue() {
+ return highestTrackableValue;
+ }
+
+ @JsonProperty
+ public int getNumberOfSignificantValueDigits() {
+ return numberOfSignificantValueDigits;
+ }
+
+ @JsonProperty
+ public boolean isAutoResize() {
+ return autoResize;
+ }
+
+ /*
+ 没这个方法了, 新版本需要实现getIntermediateType方法
+ @Override
+ public String getTypeName() {
+ return HdrHistogramModule.HDRHISTOGRAM_TYPE_NAME;
+ }*/
+
+ @Override
+ public ColumnType getIntermediateType() {
+ return HdrHistogramModule.TYPE;
+ }
+
+ @Override
+ public List requiredFields() {
+ return Collections.singletonList(fieldName);
+ }
+
+
+ @Override
+ public int getMaxIntermediateSize() {
+ return updatableSerializationBytes == 0? getUpdatableSerializationBytes():updatableSerializationBytes;
+ }
+
+ private int getUpdatableSerializationBytes(){
+ if(!autoResize){
+ /*Histogram histogram = new Histogram(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits);
+ histogram.setAutoResize(autoResize);
+ return histogram.getNeededByteBufferCapacity();*/
+ return HistogramSketch.getUpdatableSerializationBytes(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits);
+ }else{
+ //return (1 << 10) * 512;
+ return HistogramSketch.getUpdatableSerializationBytes(lowestDiscernibleValue, BUFFER_AUTO_RESIZE_HIGHEST, numberOfSignificantValueDigits);
+ }
+ }
+
+ @Override
+ public byte[] getCacheKey() {
+ return new CacheKeyBuilder(HdrHistogramModule.CACHE_TYPE_ID_OFFSET).appendByte(HdrHistogramModule.QUANTILES_HDRHISTOGRAM_BUILD_CACHE_TYPE_ID)
+ .appendString(name).appendString(fieldName)
+ .appendDouble(lowestDiscernibleValue).appendDouble(highestTrackableValue)
+ .appendInt(numberOfSignificantValueDigits).appendBoolean(autoResize)
+ .build();
+ }
+
+ @Override
+ public boolean equals(final Object o){
+ if (this == o) {
+ return true;
+ }
+ if (o == null || !getClass().equals(o.getClass())) {
+ return false;
+ }
+
+ HdrHistogramAggregatorFactory that = (HdrHistogramAggregatorFactory) o;
+ return name.equals(that.name) && fieldName.equals(that.fieldName) &&
+ lowestDiscernibleValue == that.lowestDiscernibleValue &&
+ highestTrackableValue == that.highestTrackableValue &&
+ numberOfSignificantValueDigits == that.numberOfSignificantValueDigits &&
+ autoResize == that.autoResize
+ ;
+ }
+
+ @Override
+ public int hashCode(){
+ return Objects.hash(name, fieldName, lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits, autoResize);
+ }
+
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "{" +
+ "name='" + name + '\'' +
+ ", fieldName='" + fieldName + '\'' +
+ ", lowestDiscernibleValue=" + lowestDiscernibleValue +
+ ", highestTrackableValue=" + highestTrackableValue +
+ ", numberOfSignificantValueDigits=" + numberOfSignificantValueDigits +
+ ", autoResize=" + autoResize +
+ '}';
+ }
+}
diff --git a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramMergeAggregatorFactory.java b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramMergeAggregatorFactory.java
index 2198f06..85dae33 100644
--- a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramMergeAggregatorFactory.java
+++ b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramMergeAggregatorFactory.java
@@ -1,9 +1,9 @@
package org.apache.druid.query.aggregation.sketch.HdrHistogram;
import com.fasterxml.jackson.annotation.JsonProperty;
-import org.HdrHistogram.Histogram;
import org.HdrHistogram.HistogramSketch;
import org.apache.druid.query.aggregation.Aggregator;
+import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.ColumnSelectorFactory;
@@ -48,6 +48,11 @@ public class HdrHistogramMergeAggregatorFactory extends HdrHistogramAggregatorFa
);
}
+ @Override
+ public AggregatorFactory withName(String newName) {
+ return new HdrHistogramMergeAggregatorFactory(newName, fieldName, lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits, autoResize);
+ }
+
@Override
public byte[] getCacheKey() {
return new CacheKeyBuilder(HdrHistogramModule.CACHE_TYPE_ID_OFFSET).appendByte(HdrHistogramModule.QUANTILES_HDRHISTOGRAM_MERGE_CACHE_TYPE_ID)
diff --git a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramModule.java b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramModule.java
index 117feda..5041965 100644
--- a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramModule.java
+++ b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramModule.java
@@ -7,13 +7,13 @@ import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Binder;
-import org.HdrHistogram.Histogram;
import org.HdrHistogram.HistogramSketch;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.query.aggregation.sketch.HdrHistogram.sql.HdrHistogramObjectSqlAggregator;
import org.apache.druid.query.aggregation.sketch.HdrHistogram.sql.HdrHistogramPercentilesOperatorConversion;
import org.apache.druid.query.aggregation.sketch.HdrHistogram.sql.HdrHistogramQuantileSqlAggregator;
import org.apache.druid.query.aggregation.sketch.HdrHistogram.sql.HdrHistogramQuantilesOperatorConversion;
+import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.sql.guice.SqlBindings;
@@ -29,6 +29,7 @@ public class HdrHistogramModule implements DruidModule {
public static final byte QUANTILES_HDRHISTOGRAM_TO_PERCENTILES_CACHE_TYPE_ID = 0x05;
public static final String HDRHISTOGRAM_TYPE_NAME = "HdrHistogramSketch";
+ public static final ColumnType TYPE = ColumnType.ofComplex(HDRHISTOGRAM_TYPE_NAME);
public static final ObjectMapper objectMapper = new ObjectMapper();
diff --git a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramToPercentilesPostAggregator.java b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramToPercentilesPostAggregator.java
index 96ba73a..e7cc955 100644
--- a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramToPercentilesPostAggregator.java
+++ b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramToPercentilesPostAggregator.java
@@ -1,111 +1,121 @@
-package org.apache.druid.query.aggregation.sketch.HdrHistogram;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.Sets;
-import org.HdrHistogram.HistogramSketch;
-import org.HdrHistogram.Percentile;
-import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.query.aggregation.AggregatorFactory;
-import org.apache.druid.query.aggregation.PostAggregator;
-import org.apache.druid.query.cache.CacheKeyBuilder;
-
-import javax.annotation.Nullable;
-import java.util.*;
-
-public class HdrHistogramToPercentilesPostAggregator implements PostAggregator {
- private final String name;
- private final String fieldName;
- private final int percentileTicksPerHalfDistance;
-
- @JsonCreator
- public HdrHistogramToPercentilesPostAggregator(
- @JsonProperty("name") String name,
- @JsonProperty("fieldName") String fieldName,
- @JsonProperty("percentileTicksPerHalfDistance") int percentileTicksPerHalfDistance
- ){
- this.name = name;
- this.fieldName = fieldName;
- this.percentileTicksPerHalfDistance = percentileTicksPerHalfDistance;
- }
-
- @Override
- @JsonProperty
- public String getName() {
- return name;
- }
-
- @JsonProperty
- public String getFieldName() {
- return fieldName;
- }
-
- @JsonProperty
- public int getPercentileTicksPerHalfDistance() {
- return percentileTicksPerHalfDistance;
- }
-
- @Nullable
- @Override
- public Object compute(Map values) {
- HistogramSketch histogram = (HistogramSketch) values.get(fieldName);
- List percentiles = histogram.percentileList(percentileTicksPerHalfDistance);
- return HdrHistogramModule.toJson(percentiles);
- }
-
- @Override
- public Comparator getComparator()
- {
- throw new IAE("Comparing arrays of quantiles is not supported");
- }
-
- @Override
- public Set getDependentFields()
- {
- return Sets.newHashSet(fieldName);
- }
-
- @Override
- public PostAggregator decorate(Map aggregators) {
- return this;
- }
-
- @Override
- public byte[] getCacheKey() {
- CacheKeyBuilder builder = new CacheKeyBuilder(HdrHistogramModule.CACHE_TYPE_ID_OFFSET).appendByte(HdrHistogramModule.QUANTILES_HDRHISTOGRAM_TO_PERCENTILES_CACHE_TYPE_ID)
- .appendString(fieldName);
- builder.appendInt(percentileTicksPerHalfDistance);
- return builder.build();
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- HdrHistogramToPercentilesPostAggregator that = (HdrHistogramToPercentilesPostAggregator) o;
-
- return percentileTicksPerHalfDistance == that.percentileTicksPerHalfDistance &&
- name.equals(that.name) &&
- fieldName.equals(that.fieldName);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(name, fieldName, percentileTicksPerHalfDistance);
- }
-
- @Override
- public String toString() {
- return "HdrHistogramToPercentilesPostAggregator{" +
- "name='" + name + '\'' +
- ", fieldName='" + fieldName + '\'' +
- ", probabilitys=" + percentileTicksPerHalfDistance +
- '}';
- }
-
-
-}
+package org.apache.druid.query.aggregation.sketch.HdrHistogram;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Sets;
+import org.HdrHistogram.HistogramSketch;
+import org.HdrHistogram.Percentile;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.PostAggregator;
+import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.column.ColumnType;
+
+import javax.annotation.Nullable;
+import java.util.*;
+
+public class HdrHistogramToPercentilesPostAggregator implements PostAggregator {
+ private final String name;
+ private final String fieldName;
+ private final int percentileTicksPerHalfDistance;
+
+ @JsonCreator
+ public HdrHistogramToPercentilesPostAggregator(
+ @JsonProperty("name") String name,
+ @JsonProperty("fieldName") String fieldName,
+ @JsonProperty("percentileTicksPerHalfDistance") int percentileTicksPerHalfDistance
+ ){
+ this.name = name;
+ this.fieldName = fieldName;
+ this.percentileTicksPerHalfDistance = percentileTicksPerHalfDistance;
+ }
+
+ @Override
+ public ColumnType getType(ColumnInspector signature){
+ return ColumnType.STRING;
+ }
+
+ @Override
+ @JsonProperty
+ public String getName() {
+ return name;
+ }
+
+ @JsonProperty
+ public String getFieldName() {
+ return fieldName;
+ }
+
+ @JsonProperty
+ public int getPercentileTicksPerHalfDistance() {
+ return percentileTicksPerHalfDistance;
+ }
+
+ @Nullable
+ @Override
+ public Object compute(Map values) {
+ HistogramSketch histogram = (HistogramSketch) values.get(fieldName);
+ if(histogram == null){
+ return "[]"; //"[]"
+ }
+ List percentiles = histogram.percentileList(percentileTicksPerHalfDistance);
+ return HdrHistogramModule.toJson(percentiles);
+ }
+
+ @Override
+ public Comparator getComparator()
+ {
+ throw new IAE("Comparing arrays of quantiles is not supported");
+ }
+
+ @Override
+ public Set getDependentFields()
+ {
+ return Sets.newHashSet(fieldName);
+ }
+
+ @Override
+ public PostAggregator decorate(Map aggregators) {
+ return this;
+ }
+
+ @Override
+ public byte[] getCacheKey() {
+ CacheKeyBuilder builder = new CacheKeyBuilder(HdrHistogramModule.CACHE_TYPE_ID_OFFSET).appendByte(HdrHistogramModule.QUANTILES_HDRHISTOGRAM_TO_PERCENTILES_CACHE_TYPE_ID)
+ .appendString(fieldName);
+ builder.appendInt(percentileTicksPerHalfDistance);
+ return builder.build();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ HdrHistogramToPercentilesPostAggregator that = (HdrHistogramToPercentilesPostAggregator) o;
+
+ return percentileTicksPerHalfDistance == that.percentileTicksPerHalfDistance &&
+ name.equals(that.name) &&
+ fieldName.equals(that.fieldName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, fieldName, percentileTicksPerHalfDistance);
+ }
+
+ @Override
+ public String toString() {
+ return "HdrHistogramToPercentilesPostAggregator{" +
+ "name='" + name + '\'' +
+ ", fieldName='" + fieldName + '\'' +
+ ", probabilitys=" + percentileTicksPerHalfDistance +
+ '}';
+ }
+
+
+}
diff --git a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramToQuantilePostAggregator.java b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramToQuantilePostAggregator.java
index e7f37c9..5b13b90 100644
--- a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramToQuantilePostAggregator.java
+++ b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramToQuantilePostAggregator.java
@@ -1,118 +1,128 @@
-package org.apache.druid.query.aggregation.sketch.HdrHistogram;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.Sets;
-import org.HdrHistogram.Histogram;
-import org.HdrHistogram.HistogramSketch;
-import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.query.aggregation.AggregatorFactory;
-import org.apache.druid.query.aggregation.PostAggregator;
-import org.apache.druid.query.cache.CacheKeyBuilder;
-
-import javax.annotation.Nullable;
-import java.util.Comparator;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-
-public class HdrHistogramToQuantilePostAggregator implements PostAggregator {
- private final String name;
- private final String fieldName;
- private final float probability;
-
- @JsonCreator
- public HdrHistogramToQuantilePostAggregator(
- @JsonProperty("name") String name,
- @JsonProperty("fieldName") String fieldName,
- @JsonProperty("probability") float probability
- ){
- this.name = name;
- this.fieldName = fieldName;
- this.probability = probability;
-
- if (probability < 0 || probability > 1) {
- throw new IAE("Illegal probability[%s], must be strictly between 0 and 1", probability);
- }
- }
-
- @Override
- public Set getDependentFields() {
- return Sets.newHashSet(fieldName);
- }
-
- @Override
- public Comparator getComparator() {
- return new Comparator(){
- @Override
- public int compare(final Long a, final Long b){
- return Long.compare(a, b);
- }
- };
- }
-
- @Nullable
- @Override
- public Object compute(Map values) {
- HistogramSketch histogram = (HistogramSketch) values.get(fieldName);
- return histogram.getValueAtPercentile(probability * 100);
- }
-
- @Override
- @JsonProperty
- public String getName() {
- return name;
- }
-
- @JsonProperty
- public String getFieldName() {
- return fieldName;
- }
-
- @JsonProperty
- public double getProbability() {
- return probability;
- }
-
- @Override
- public PostAggregator decorate(Map aggregators) {
- return this;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- HdrHistogramToQuantilePostAggregator that = (HdrHistogramToQuantilePostAggregator) o;
-
- return Float.compare(that.probability, probability) == 0 &&
- name.equals(that.name) &&
- fieldName.equals(that.fieldName);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(name, fieldName, probability);
- }
-
- @Override
- public String toString() {
- return "HdrHistogramToQuantilePostAggregator{" +
- "name='" + name + '\'' +
- ", fieldName='" + fieldName + '\'' +
- ", probability=" + probability +
- '}';
- }
-
- @Override
- public byte[] getCacheKey() {
- return new CacheKeyBuilder(HdrHistogramModule.CACHE_TYPE_ID_OFFSET).appendByte(HdrHistogramModule.QUANTILES_HDRHISTOGRAM_TO_QUANTILE_CACHE_TYPE_ID)
- .appendString(fieldName)
- .appendFloat(probability)
- .build();
- }
-}
+package org.apache.druid.query.aggregation.sketch.HdrHistogram;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Sets;
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.HistogramSketch;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.PostAggregator;
+import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.column.ColumnType;
+
+import javax.annotation.Nullable;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+public class HdrHistogramToQuantilePostAggregator implements PostAggregator {
+ private final String name;
+ private final String fieldName;
+ private final float probability;
+
+ @JsonCreator
+ public HdrHistogramToQuantilePostAggregator(
+ @JsonProperty("name") String name,
+ @JsonProperty("fieldName") String fieldName,
+ @JsonProperty("probability") float probability
+ ){
+ this.name = name;
+ this.fieldName = fieldName;
+ this.probability = probability;
+
+ if (probability < 0 || probability > 1) {
+ throw new IAE("Illegal probability[%s], must be strictly between 0 and 1", probability);
+ }
+ }
+
+ @Override
+ public ColumnType getType(ColumnInspector signature){
+ return ColumnType.LONG;
+ }
+
+ @Override
+ public Set getDependentFields() {
+ return Sets.newHashSet(fieldName);
+ }
+
+ @Override
+ public Comparator getComparator() {
+ return new Comparator(){
+ @Override
+ public int compare(final Long a, final Long b){
+ return Long.compare(a, b);
+ }
+ };
+ }
+
+ @Nullable
+ @Override
+ public Object compute(Map values) {
+ HistogramSketch histogram = (HistogramSketch) values.get(fieldName);
+ if(histogram == null){
+ return null;
+ }
+ return histogram.getValueAtPercentile(probability * 100);
+ }
+
+ @Override
+ @JsonProperty
+ public String getName() {
+ return name;
+ }
+
+ @JsonProperty
+ public String getFieldName() {
+ return fieldName;
+ }
+
+ @JsonProperty
+ public double getProbability() {
+ return probability;
+ }
+
+ @Override
+ public PostAggregator decorate(Map aggregators) {
+ return this;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ HdrHistogramToQuantilePostAggregator that = (HdrHistogramToQuantilePostAggregator) o;
+
+ return Float.compare(that.probability, probability) == 0 &&
+ name.equals(that.name) &&
+ fieldName.equals(that.fieldName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, fieldName, probability);
+ }
+
+ @Override
+ public String toString() {
+ return "HdrHistogramToQuantilePostAggregator{" +
+ "name='" + name + '\'' +
+ ", fieldName='" + fieldName + '\'' +
+ ", probability=" + probability +
+ '}';
+ }
+
+ @Override
+ public byte[] getCacheKey() {
+ return new CacheKeyBuilder(HdrHistogramModule.CACHE_TYPE_ID_OFFSET).appendByte(HdrHistogramModule.QUANTILES_HDRHISTOGRAM_TO_QUANTILE_CACHE_TYPE_ID)
+ .appendString(fieldName)
+ .appendFloat(probability)
+ .build();
+ }
+}
diff --git a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramToQuantilesPostAggregator.java b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramToQuantilesPostAggregator.java
index 216947f..9dc7761 100644
--- a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramToQuantilesPostAggregator.java
+++ b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramToQuantilesPostAggregator.java
@@ -1,114 +1,125 @@
-package org.apache.druid.query.aggregation.sketch.HdrHistogram;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.Sets;
-import org.HdrHistogram.Histogram;
-import org.HdrHistogram.HistogramSketch;
-import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.query.aggregation.AggregatorFactory;
-import org.apache.druid.query.aggregation.PostAggregator;
-import org.apache.druid.query.cache.CacheKeyBuilder;
-
-import javax.annotation.Nullable;
-import java.util.*;
-
-public class HdrHistogramToQuantilesPostAggregator implements PostAggregator {
- private final String name;
- private final String fieldName;
- private final float[] probabilitys;
-
- @JsonCreator
- public HdrHistogramToQuantilesPostAggregator(
- @JsonProperty("name") String name,
- @JsonProperty("fieldName") String fieldName,
- @JsonProperty("probabilitys") float[] probabilitys
- ){
- this.name = name;
- this.fieldName = fieldName;
- this.probabilitys = probabilitys;
- }
-
- @Override
- @JsonProperty
- public String getName() {
- return name;
- }
-
- @JsonProperty
- public String getFieldName() {
- return fieldName;
- }
-
- @JsonProperty
- public float[] getProbabilitys() {
- return probabilitys;
- }
-
- @Nullable
- @Override
- public Object compute(Map values) {
- HistogramSketch histogram = (HistogramSketch) values.get(fieldName);
- final long[] counts = new long[probabilitys.length];
- for (int i = 0; i < probabilitys.length; i++) {
- counts[i] = histogram.getValueAtPercentile(probabilitys[i] * 100);
- }
- return counts;
- }
-
- @Override
- public Comparator getComparator()
- {
- throw new IAE("Comparing arrays of quantiles is not supported");
- }
-
- @Override
- public Set getDependentFields()
- {
- return Sets.newHashSet(fieldName);
- }
-
- @Override
- public PostAggregator decorate(Map aggregators) {
- return this;
- }
-
- @Override
- public byte[] getCacheKey() {
- CacheKeyBuilder builder = new CacheKeyBuilder(HdrHistogramModule.CACHE_TYPE_ID_OFFSET).appendByte(HdrHistogramModule.QUANTILES_HDRHISTOGRAM_TO_QUANTILES_CACHE_TYPE_ID)
- .appendString(fieldName);
- for (float probability : probabilitys) {
- builder.appendFloat(probability);
- }
- return builder.build();
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- HdrHistogramToQuantilesPostAggregator that = (HdrHistogramToQuantilesPostAggregator) o;
-
- return Arrays.equals(probabilitys, that.probabilitys) &&
- name.equals(that.name) &&
- fieldName.equals(that.fieldName);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(name, fieldName, Arrays.hashCode(probabilitys));
- }
-
- @Override
- public String toString() {
- return "HdrHistogramToQuantilesPostAggregator{" +
- "name='" + name + '\'' +
- ", fieldName='" + fieldName + '\'' +
- ", probabilitys=" + Arrays.toString(probabilitys) +
- '}';
- }
-}
+package org.apache.druid.query.aggregation.sketch.HdrHistogram;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Sets;
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.HistogramSketch;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.PostAggregator;
+import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.column.ColumnType;
+
+import javax.annotation.Nullable;
+import java.util.*;
+
+public class HdrHistogramToQuantilesPostAggregator implements PostAggregator {
+ private final String name;
+ private final String fieldName;
+ private final float[] probabilitys;
+
+ @JsonCreator
+ public HdrHistogramToQuantilesPostAggregator(
+ @JsonProperty("name") String name,
+ @JsonProperty("fieldName") String fieldName,
+ @JsonProperty("probabilitys") float[] probabilitys
+ ){
+ this.name = name;
+ this.fieldName = fieldName;
+ this.probabilitys = probabilitys;
+ }
+
+ @Override
+ public ColumnType getType(ColumnInspector signature){
+ return ColumnType.LONG_ARRAY;
+ }
+
+ @Override
+ @JsonProperty
+ public String getName() {
+ return name;
+ }
+
+ @JsonProperty
+ public String getFieldName() {
+ return fieldName;
+ }
+
+ @JsonProperty
+ public float[] getProbabilitys() {
+ return probabilitys;
+ }
+
+ @Nullable
+ @Override
+ public Object compute(Map values) {
+ HistogramSketch histogram = (HistogramSketch) values.get(fieldName);
+ if(histogram == null){
+ //return null;
+ return new Long[probabilitys.length];
+ }
+ final Long[] counts = new Long[probabilitys.length];
+ for (int i = 0; i < probabilitys.length; i++) {
+ counts[i] = histogram.getValueAtPercentile(probabilitys[i] * 100);
+ }
+ return counts;
+ }
+
+ @Override
+ public Comparator getComparator()
+ {
+ throw new IAE("Comparing arrays of quantiles is not supported");
+ }
+
+ @Override
+ public Set getDependentFields()
+ {
+ return Sets.newHashSet(fieldName);
+ }
+
+ @Override
+ public PostAggregator decorate(Map aggregators) {
+ return this;
+ }
+
+ @Override
+ public byte[] getCacheKey() {
+ CacheKeyBuilder builder = new CacheKeyBuilder(HdrHistogramModule.CACHE_TYPE_ID_OFFSET).appendByte(HdrHistogramModule.QUANTILES_HDRHISTOGRAM_TO_QUANTILES_CACHE_TYPE_ID)
+ .appendString(fieldName);
+ for (float probability : probabilitys) {
+ builder.appendFloat(probability);
+ }
+ return builder.build();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ HdrHistogramToQuantilesPostAggregator that = (HdrHistogramToQuantilesPostAggregator) o;
+
+ return Arrays.equals(probabilitys, that.probabilitys) &&
+ name.equals(that.name) &&
+ fieldName.equals(that.fieldName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, fieldName, Arrays.hashCode(probabilitys));
+ }
+
+ @Override
+ public String toString() {
+ return "HdrHistogramToQuantilesPostAggregator{" +
+ "name='" + name + '\'' +
+ ", fieldName='" + fieldName + '\'' +
+ ", probabilitys=" + Arrays.toString(probabilitys) +
+ '}';
+ }
+}
diff --git a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/sql/HdrHistogramObjectSqlAggregator.java b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/sql/HdrHistogramObjectSqlAggregator.java
index 6a47da7..5d522b6 100644
--- a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/sql/HdrHistogramObjectSqlAggregator.java
+++ b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/sql/HdrHistogramObjectSqlAggregator.java
@@ -18,6 +18,7 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.sketch.HdrHistogram.HdrHistogramAggregatorFactory;
import org.apache.druid.query.aggregation.sketch.HdrHistogram.HdrHistogramMergeAggregatorFactory;
import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
@@ -118,11 +119,11 @@ public class HdrHistogramObjectSqlAggregator implements SqlAggregator {
}
// No existing match found. Create a new one.
- final List virtualColumns = new ArrayList<>();
+ // 新版本删除了final List virtualColumns = new ArrayList<>();
if (input.isDirectColumnAccess()) {
// 参数是Histogram对象
- if (rowSignature.getColumnType(input.getDirectColumn()).orElse(null) == ValueType.COMPLEX) {
+ if (rowSignature.getColumnType(input.getDirectColumn()).map(type -> type.is(ValueType.COMPLEX)).orElse(false)) {
aggregatorFactory = new HdrHistogramMergeAggregatorFactory(
histogramName,
input.getDirectColumn(),
@@ -142,12 +143,11 @@ public class HdrHistogramObjectSqlAggregator implements SqlAggregator {
);
}
} else {
- final VirtualColumn virtualColumn =
- virtualColumnRegistry.getOrCreateVirtualColumnForExpression(plannerContext, input, SqlTypeName.BIGINT);
- virtualColumns.add(virtualColumn);
+ final String virtualColumnName =
+ virtualColumnRegistry.getOrCreateVirtualColumnForExpression(input, ColumnType.LONG);
aggregatorFactory = new HdrHistogramAggregatorFactory(
histogramName,
- virtualColumn.getOutputName(),
+ virtualColumnName,
lowestDiscernibleValue,
highestTrackableValue,
numberOfSignificantValueDigits,
@@ -156,7 +156,6 @@ public class HdrHistogramObjectSqlAggregator implements SqlAggregator {
}
return Aggregation.create(
- virtualColumns,
ImmutableList.of(aggregatorFactory),
null
);
diff --git a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/sql/HdrHistogramPercentilesOperatorConversion.java b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/sql/HdrHistogramPercentilesOperatorConversion.java
index 710fd69..d683f0e 100644
--- a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/sql/HdrHistogramPercentilesOperatorConversion.java
+++ b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/sql/HdrHistogramPercentilesOperatorConversion.java
@@ -14,16 +14,16 @@ import org.apache.druid.query.aggregation.sketch.HdrHistogram.HdrHistogramToPerc
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.apache.druid.segment.column.RowSignature;
-import org.apache.druid.sql.calcite.expression.DirectOperatorConversion;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.OperatorConversions;
import org.apache.druid.sql.calcite.expression.PostAggregatorVisitor;
+import org.apache.druid.sql.calcite.expression.SqlOperatorConversion;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import javax.annotation.Nullable;
import java.util.List;
-public class HdrHistogramPercentilesOperatorConversion extends DirectOperatorConversion {
+public class HdrHistogramPercentilesOperatorConversion implements SqlOperatorConversion {
private static final String FUNCTION_NAME = "HDR_GET_PERCENTILES";
private static final SqlFunction SQL_FUNCTION = OperatorConversions
.operatorBuilder(StringUtils.toUpperCase(FUNCTION_NAME))
@@ -32,10 +32,6 @@ public class HdrHistogramPercentilesOperatorConversion extends DirectOperatorCon
.returnTypeInference(ReturnTypes.explicit(SqlTypeName.VARCHAR))
.build();
- public HdrHistogramPercentilesOperatorConversion() {
- super(SQL_FUNCTION, FUNCTION_NAME);
- }
-
@Override
public SqlOperator calciteOperator()
{
@@ -66,7 +62,8 @@ public class HdrHistogramPercentilesOperatorConversion extends DirectOperatorCon
plannerContext,
rowSignature,
operands.get(0),
- postAggregatorVisitor
+ postAggregatorVisitor,
+ true
);
if (postAgg == null) {
diff --git a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/sql/HdrHistogramQuantileSqlAggregator.java b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/sql/HdrHistogramQuantileSqlAggregator.java
index b23489d..b14c1aa 100644
--- a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/sql/HdrHistogramQuantileSqlAggregator.java
+++ b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/sql/HdrHistogramQuantileSqlAggregator.java
@@ -16,6 +16,7 @@ import org.apache.druid.query.aggregation.sketch.HdrHistogram.HdrHistogramAggreg
import org.apache.druid.query.aggregation.sketch.HdrHistogram.HdrHistogramMergeAggregatorFactory;
import org.apache.druid.query.aggregation.sketch.HdrHistogram.HdrHistogramToQuantilePostAggregator;
import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
@@ -141,22 +142,16 @@ public class HdrHistogramQuantileSqlAggregator implements SqlAggregator {
// Check input for equivalence.
final boolean inputMatches;
- final VirtualColumn virtualInput = existing.getVirtualColumns()
- .stream()
- .filter(
- virtualColumn ->
- virtualColumn.getOutputName()
- .equals(theFactory.getFieldName())
- )
- .findFirst()
- .orElse(null);
+ final DruidExpression virtualInput =
+ virtualColumnRegistry.findVirtualColumnExpressions(theFactory.requiredFields())
+ .stream()
+ .findFirst()
+ .orElse(null);
if (virtualInput == null) {
- inputMatches = input.isDirectColumnAccess()
- && input.getDirectColumn().equals(theFactory.getFieldName());
+ inputMatches = input.isDirectColumnAccess() && input.getDirectColumn().equals(theFactory.getFieldName());
} else {
- inputMatches = ((ExpressionVirtualColumn) virtualInput).getExpression()
- .equals(input.getExpression());
+ inputMatches = virtualInput.equals(input);
}
final boolean matches = inputMatches
@@ -177,11 +172,11 @@ public class HdrHistogramQuantileSqlAggregator implements SqlAggregator {
}
// No existing match found. Create a new one.
- final List virtualColumns = new ArrayList<>();
+ //final List virtualColumns = new ArrayList<>();
if (input.isDirectColumnAccess()) {
// 参数是Histogram对象
- if (rowSignature.getColumnType(input.getDirectColumn()).orElse(null) == ValueType.COMPLEX) {
+ if (rowSignature.getColumnType(input.getDirectColumn()).map(type -> type.is(ValueType.COMPLEX)).orElse(false)) {
aggregatorFactory = new HdrHistogramMergeAggregatorFactory(
histogramName,
input.getDirectColumn(),
@@ -201,12 +196,11 @@ public class HdrHistogramQuantileSqlAggregator implements SqlAggregator {
);
}
} else {
- final VirtualColumn virtualColumn =
- virtualColumnRegistry.getOrCreateVirtualColumnForExpression(plannerContext, input, SqlTypeName.BIGINT);
- virtualColumns.add(virtualColumn);
+ final String virtualColumnName =
+ virtualColumnRegistry.getOrCreateVirtualColumnForExpression(input, ColumnType.LONG);
aggregatorFactory = new HdrHistogramAggregatorFactory(
histogramName,
- virtualColumn.getOutputName(),
+ virtualColumnName,
lowestDiscernibleValue,
highestTrackableValue,
numberOfSignificantValueDigits,
@@ -234,7 +228,6 @@ public class HdrHistogramQuantileSqlAggregator implements SqlAggregator {
}
return Aggregation.create(
- virtualColumns,
ImmutableList.of(aggregatorFactory),
new HdrHistogramToQuantilePostAggregator(name, histogramName, probability)
);
diff --git a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/sql/HdrHistogramQuantilesOperatorConversion.java b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/sql/HdrHistogramQuantilesOperatorConversion.java
index ce75587..a14a15e 100644
--- a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/sql/HdrHistogramQuantilesOperatorConversion.java
+++ b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/sql/HdrHistogramQuantilesOperatorConversion.java
@@ -62,50 +62,30 @@ public class HdrHistogramQuantilesOperatorConversion implements SqlOperatorConve
{
final List operands = ((RexCall) rexNode).getOperands();
final float[] args = new float[operands.size() - 1];
- PostAggregator postAgg = null;
- int operandCounter = 0;
- for (RexNode operand : operands) {
- final PostAggregator convertedPostAgg = OperatorConversions.toPostAggregator(
- plannerContext,
- rowSignature,
- operand,
- postAggregatorVisitor
- );
- if (convertedPostAgg == null) {
- if (operandCounter > 0) {
- try {
- if (!operand.isA(SqlKind.LITERAL)) {
- return null;
- }
- float arg = ((Number) RexLiteral.value(operand)).floatValue();
- args[operandCounter - 1] = arg;
- }
- catch (ClassCastException cce) {
- return null;
- }
- } else {
- return null;
- }
- } else {
- if (operandCounter == 0) {
- postAgg = convertedPostAgg;
- } else {
- if (!operand.isA(SqlKind.LITERAL)) {
- return null;
- }
- }
- }
- operandCounter++;
+ // 新版本直接就从第一个参数取
+ final PostAggregator inputSketchPostAgg = OperatorConversions.toPostAggregator(
+ plannerContext,
+ rowSignature,
+ operands.get(0),
+ postAggregatorVisitor,
+ true
+ );
+
+ if (inputSketchPostAgg == null) {
+ return null;
}
- if (postAgg == null) {
- return null;
+ // 直接解析
+ for (int i = 1; i < operands.size(); i++) {
+ RexNode operand = operands.get(i);
+ float arg = ((Number) RexLiteral.value(operand)).floatValue();
+ args[i - 1] = arg;
}
return new HdrHistogramToQuantilesPostAggregator(
postAggregatorVisitor.getOutputNamePrefix() + postAggregatorVisitor.getAndIncrementCounter(),
- ((FieldAccessPostAggregator)postAgg).getFieldName(),
+ ((FieldAccessPostAggregator)inputSketchPostAgg).getFieldName(),
args
);
}
diff --git a/druid-hdrhistogram/src/test/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramBufferAggregatorTest.java b/druid-hdrhistogram/src/test/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramBufferAggregatorTest.java
index de409c8..8c3e0b2 100644
--- a/druid-hdrhistogram/src/test/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramBufferAggregatorTest.java
+++ b/druid-hdrhistogram/src/test/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramBufferAggregatorTest.java
@@ -2,17 +2,13 @@ package org.apache.druid.query.aggregation.sketch.HdrHistogram;
import com.google.common.collect.ImmutableMap;
import org.HdrHistogram.*;
-import org.apache.datasketches.theta.Sketches;
-import org.apache.datasketches.theta.UpdateSketch;
import org.apache.druid.data.input.MapBasedRow;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.TestLongColumnSelector;
import org.apache.druid.query.aggregation.TestObjectColumnSelector;
-import org.apache.druid.query.aggregation.datasketches.theta.SketchHolder;
-import org.apache.druid.query.aggregation.datasketches.theta.SketchMergeAggregatorFactory;
import org.apache.druid.query.groupby.epinephelinae.GrouperTestUtil;
-import org.apache.druid.query.groupby.epinephelinae.TestColumnSelectorFactory;
+import org.apache.druid.query.groupby.epinephelinae.GroupByTestColumnSelectorFactory;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.junit.Assert;
import org.junit.Test;
@@ -230,7 +226,7 @@ public class HdrHistogramBufferAggregatorTest {
@Test
public void testMergeAggregatorRelocate() {
- final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
+ final GroupByTestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
HistogramSketch histogram = new HistogramSketch(3);
for (int i = 0; i < 100000; i++) {
histogram.recordValue(i);
@@ -252,7 +248,7 @@ public class HdrHistogramBufferAggregatorTest {
@Test
public void testAggregatorRelocate() {
- final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
+ final GroupByTestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
HistogramSketch histogram = new HistogramSketch(3);
for (int i = 0; i < 100000; i++) {
histogram.recordValue(i);
diff --git a/druid-hdrhistogram/src/test/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/sql/HdrHistogramQuantileSqlAggregatorTest.java b/druid-hdrhistogram/src/test/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/sql/HdrHistogramQuantileSqlAggregatorTest.java
index 054dc05..639b95f 100644
--- a/druid-hdrhistogram/src/test/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/sql/HdrHistogramQuantileSqlAggregatorTest.java
+++ b/druid-hdrhistogram/src/test/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/sql/HdrHistogramQuantileSqlAggregatorTest.java
@@ -1,12 +1,15 @@
package org.apache.druid.query.aggregation.sketch.HdrHistogram.sql;
+import com.alibaba.fastjson2.JSON;
import com.fasterxml.jackson.databind.Module;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
+import com.google.inject.Injector;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.druid.data.input.InputRow;
+import org.apache.druid.guice.DruidInjectorBuilder;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.Druids;
@@ -27,66 +30,49 @@ import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthenticationResult;
-import org.apache.druid.sql.SqlLifecycle;
-import org.apache.druid.sql.SqlLifecycleFactory;
+import org.apache.druid.sql.calcite.BaseCalciteQueryTest;
+import org.apache.druid.sql.calcite.QueryTestBuilder;
+import org.apache.druid.sql.calcite.QueryTestRunner;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.planner.PlannerFactory;
-import org.apache.druid.sql.calcite.util.CalciteTestBase;
-import org.apache.druid.sql.calcite.util.CalciteTests;
-import org.apache.druid.sql.calcite.util.QueryLogHook;
-import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
+import org.apache.druid.sql.calcite.util.*;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.junit.*;
import org.junit.rules.TemporaryFolder;
+import java.io.File;
import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
-public class HdrHistogramQuantileSqlAggregatorTest extends CalciteTestBase {
- private static final String DATA_SOURCE = "foo";
-
- private static QueryRunnerFactoryConglomerate conglomerate;
- private static Closer resourceCloser;
- private static AuthenticationResult authenticationResult = CalciteTests.REGULAR_USER_AUTH_RESULT;
- private static final Map QUERY_CONTEXT_DEFAULT = ImmutableMap.of(
- PlannerContext.CTX_SQL_QUERY_ID, "dummy"
- );
-
- @Rule
- public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
- @Rule
- public QueryLogHook queryLogHook = QueryLogHook.create();
-
- private SpecificSegmentsQuerySegmentWalker walker;
- private SqlLifecycleFactory sqlLifecycleFactory;
-
- @BeforeClass
- public static void setUpClass() {
- resourceCloser = Closer.create();
- conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(resourceCloser);
+public class HdrHistogramQuantileSqlAggregatorTest extends BaseCalciteQueryTest {
+ @Override
+ public void gatherProperties(Properties properties)
+ {
+ super.gatherProperties(properties);
}
- @AfterClass
- public static void tearDownClass() throws IOException {
- resourceCloser.close();
+ @Override
+ public void configureGuice(DruidInjectorBuilder builder)
+ {
+ super.configureGuice(builder);
+ builder.addModule(new HdrHistogramModule());
}
public static final List ROWS1 = ImmutableList.of(
- CalciteTests.createRow(
+ TestDataBuilder.createRow(
ImmutableMap.builder()
.put("t", "2000-01-01")
.put("m1", "1")
@@ -96,7 +82,7 @@ public class HdrHistogramQuantileSqlAggregatorTest extends CalciteTestBase {
.put("dim3", ImmutableList.of("a", "b"))
.build()
),
- CalciteTests.createRow(
+ TestDataBuilder.createRow(
ImmutableMap.builder()
.put("t", "2000-01-02")
.put("m1", "2.0")
@@ -106,7 +92,7 @@ public class HdrHistogramQuantileSqlAggregatorTest extends CalciteTestBase {
.put("dim3", ImmutableList.of("b", "c"))
.build()
),
- CalciteTests.createRow(
+ TestDataBuilder.createRow(
ImmutableMap.builder()
.put("t", "2000-01-03")
.put("m1", "3.0")
@@ -116,7 +102,7 @@ public class HdrHistogramQuantileSqlAggregatorTest extends CalciteTestBase {
.put("dim3", ImmutableList.of("d"))
.build()
),
- CalciteTests.createRow(
+ TestDataBuilder.createRow(
ImmutableMap.builder()
.put("t", "2001-01-01")
.put("m1", "4.0")
@@ -126,7 +112,7 @@ public class HdrHistogramQuantileSqlAggregatorTest extends CalciteTestBase {
.put("dim3", ImmutableList.of(""))
.build()
),
- CalciteTests.createRow(
+ TestDataBuilder.createRow(
ImmutableMap.builder()
.put("t", "2001-01-02")
.put("m1", "5.0")
@@ -136,7 +122,7 @@ public class HdrHistogramQuantileSqlAggregatorTest extends CalciteTestBase {
.put("dim3", ImmutableList.of())
.build()
),
- CalciteTests.createRow(
+ TestDataBuilder.createRow(
ImmutableMap.builder()
.put("t", "2001-01-03")
.put("m1", "6.0")
@@ -146,15 +132,20 @@ public class HdrHistogramQuantileSqlAggregatorTest extends CalciteTestBase {
)
);
- @Before
- public void setUp() throws Exception {
+ @SuppressWarnings("resource")
+ @Override
+ public SpecificSegmentsQuerySegmentWalker createQuerySegmentWalker(
+ final QueryRunnerFactoryConglomerate conglomerate,
+ final JoinableFactoryWrapper joinableFactory,
+ final Injector injector
+ ) throws IOException{
HdrHistogramModule.registerSerde();
for (Module mod : new HdrHistogramModule().getJacksonModules()) {
CalciteTests.getJsonMapper().registerModule(mod);
TestHelper.JSON_MAPPER.registerModule(mod);
}
-
- final QueryableIndex index = IndexBuilder.create()
+ //final QueryableIndex index = TestHelper.getTestIndexIO().loadIndex(new File("D:/doc/datas/testIndex-6201298"));
+ /*final QueryableIndex index = IndexBuilder.create()
.tmpDir(temporaryFolder.newFolder())
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
.schema(
@@ -176,81 +167,183 @@ public class HdrHistogramQuantileSqlAggregatorTest extends CalciteTestBase {
)
//.rows(CalciteTests.ROWS1)
.rows(ROWS1)
- .buildMMappedIndex();
+ .buildMMappedIndex();*/
- walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add(
- DataSegment.builder()
- .dataSource(DATA_SOURCE)
- .interval(index.getDataInterval())
- .version("1")
- .shardSpec(new LinearShardSpec(0))
- .size(0)
- .build(),
- index
- );
+ String[] files = new String[]{
+ "D:\\doc\\datas\\statistics_rule_segments\\2023-10-16T00_00_00.000Z_2023-10-17T00_00_00.000Z\\2023-10-16T07_51_47.981Z\\0\\17a457e4-599d-49c2-86e7-6655851bb99a\\index",
+ "D:\\doc\\datas\\statistics_rule_segments\\2023-10-15T00_00_00.000Z_2023-10-16T00_00_00.000Z\\2023-10-15T00_00_04.240Z\\15\\9a766f6c-779d-4f9f-9ff5-6a12c19b8c6c\\index"
+ };
+ files = new String[]{
+ "D:/doc/datas/testIndex-6201298"
+ };
+ SpecificSegmentsQuerySegmentWalker walker = new SpecificSegmentsQuerySegmentWalker(conglomerate);
- final PlannerConfig plannerConfig = new PlannerConfig();
- final DruidOperatorTable operatorTable = new DruidOperatorTable(
- ImmutableSet.of(
- new HdrHistogramQuantileSqlAggregator(),
- new HdrHistogramObjectSqlAggregator()
- ),
- ImmutableSet.of(
- new HdrHistogramQuantilesOperatorConversion(),
- new HdrHistogramPercentilesOperatorConversion()
- )
- );
- SchemaPlus rootSchema =
- CalciteTests.createMockRootSchema(conglomerate, walker, plannerConfig, AuthTestUtils.TEST_AUTHORIZER_MAPPER);
+ for (int i = 0; i < files.length; i++) {
+ QueryableIndex index = TestHelper.getTestIndexIO().loadIndex(new File(files[i]));
+ return walker.add(
+ DataSegment.builder()
+ .dataSource(CalciteTests.DATASOURCE1)
+ .interval(index.getDataInterval())
+ .version("1")
+ .shardSpec(new LinearShardSpec(i))
+ .size(0)
+ .build(),
+ index
+ );
+ }
- sqlLifecycleFactory = CalciteTests.createSqlLifecycleFactory(
- new PlannerFactory(
- rootSchema,
- CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
- operatorTable,
- CalciteTests.createExprMacroTable(),
- plannerConfig,
- AuthTestUtils.TEST_AUTHORIZER_MAPPER,
- CalciteTests.getJsonMapper(),
- CalciteTests.DRUID_SCHEMA_NAME
- )
- );
+ return walker;
}
- @After
- public void tearDown() throws Exception {
- walker.close();
- walker = null;
+ @Test
+ public void testCount0() throws Exception {
+ String sql = "select count(1) cnt, APPROX_QUANTILE_HDR(hist_m1, 0.5, 1, 100, 2) from druid.foo where dim1 = 'aaa'";
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List
+
+
+ org.easymock
+ easymock
+ 4.3
+ test
+
+
org.apache.druid
druid-processing
@@ -42,9 +50,17 @@
org.apache.druid
- druid-benchmarks
+ druid-server
${druid.version}
test
+ test-jar
+
+
+ org.apache.druid
+ druid-sql
+ ${druid.version}
+ test-jar
+ test
junit
diff --git a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllAggregatorFactory.java b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllAggregatorFactory.java
index ea1fad9..b892494 100644
--- a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllAggregatorFactory.java
+++ b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllAggregatorFactory.java
@@ -1,256 +1,287 @@
-package org.apache.druid.query.aggregation.sketch.hlld;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.zdjz.galaxy.sketch.hlld.Hll;
-import com.zdjz.galaxy.sketch.hlld.HllUnion;
-import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.query.aggregation.*;
-import org.apache.druid.query.cache.CacheKeyBuilder;
-import org.apache.druid.segment.ColumnSelectorFactory;
-import org.apache.druid.segment.ColumnValueSelector;
-
-import javax.annotation.Nullable;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Objects;
-
-public class HllAggregatorFactory extends AggregatorFactory {
- private static final Logger LOG = new Logger(HllAggregatorFactory.class);
- public static final boolean DEFAULT_ROUND = false;
- public static final int DEFAULT_PRECISION = 12;
-
- static final Comparator COMPARATOR = Comparator.nullsFirst(Comparator.comparingDouble(Hll::size));
-
- protected final String name;
- protected final String fieldName;
- protected final int precision;
- protected final boolean round;
-
- public HllAggregatorFactory(
- @JsonProperty("name") final String name,
- @JsonProperty("fieldName") final String fieldName,
- @JsonProperty("precision") @Nullable final Integer precision,
- @JsonProperty("round") @Nullable final Boolean round
- ) {
- if (name == null) {
- throw new IAE("Must have a valid, non-null aggregator name");
- }
- if (fieldName == null) {
- throw new IAE("Parameter fieldName must be specified");
- }
- this.name = name;
- this.fieldName = fieldName;
- this.precision = precision == null ? DEFAULT_PRECISION : precision;
- this.round = round == null ? DEFAULT_ROUND : round;
- }
-
- @Override
- public Aggregator factorize(ColumnSelectorFactory columnSelectorFactory) {
- final ColumnValueSelector selector = columnSelectorFactory.makeColumnValueSelector(fieldName);
- return new HllAggregator(selector, precision);
- }
-
- @Override
- public BufferAggregator factorizeBuffered(ColumnSelectorFactory columnSelectorFactory) {
- final ColumnValueSelector selector = columnSelectorFactory.makeColumnValueSelector(fieldName);
- return new HllBufferAggregator(
- selector,
- precision
- );
- }
-
- @Override
- public Comparator getComparator() {
- return COMPARATOR;
- }
-
- @Override
- public Object combine(Object lhs, Object rhs) {
- if(lhs == null){
- return rhs;
- }else if(rhs == null){
- return lhs;
- }else{
- final HllUnion union = new HllUnion(precision);
- union.update((Hll) lhs);
- union.update((Hll) rhs);
- Hll result = union.getResult();
- return result;
- }
- }
-
- @Override
- public AggregateCombiner makeAggregateCombiner() {
- return new ObjectAggregateCombiner() {
- private HllUnion union = null;
-
- @Override
- public void reset(ColumnValueSelector selector) {
- //LOG.error("HllAggregateCombiner reset:" + "-" + Thread.currentThread().getId() + "-" + this);
- //union.reset();
- union = null;
- fold(selector);
- }
-
- @Override
- public void fold(ColumnValueSelector selector) {
- //LOG.error("HllAggregateCombiner fold:" + "-" + Thread.currentThread().getId() + "-" + this);
- final Hll hll = (Hll) selector.getObject();
- if(hll != null){
- if(union == null){
- union = new HllUnion(precision);
- }
- union.update(hll);
- }else{
- //LOG.error("HllAggregateCombiner fold_null:" + "-" + Thread.currentThread().getId() + "-" + this);
- }
- }
-
- @Override
- public Class classOfObject() {
- return Hll.class;
- }
-
- @Nullable
- @Override
- public Hll getObject() {
- //LOG.error("HllAggregateCombiner get:" + "-" + Thread.currentThread().getId() + "-" + this);
- if(union == null){
- return null;
- }else{
- Hll result = union.getResult();
- /*if(result.size() == 0){
- return null;
- }*/
- return result;
- }
- }
- };
- }
-
- @Override
- public AggregatorFactory getCombiningFactory() {
- // 千万不能写错,好大一个坑
- return new HllMergeAggregatorFactory(name, name, precision, round);
- }
-
- @Override
- public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException {
- if (other.getName().equals(this.getName()) && other instanceof HllAggregatorFactory) {
- HllAggregatorFactory castedOther = (HllAggregatorFactory) other;
-
- return new HllMergeAggregatorFactory(name, name,
- Math.max(precision, castedOther.precision),
- round || castedOther.round
- );
- } else {
- throw new AggregatorFactoryNotMergeableException(this, other);
- }
- }
-
- @Override
- public List getRequiredColumns() {
- return Collections.singletonList(
- new HllAggregatorFactory(fieldName, fieldName, precision, round)
- );
- }
-
- @Override
- public Object deserialize(Object object) {
- return HllUtils.deserializeHll(object);
- }
-
- @Nullable
- @Override
- public Object finalizeComputation(@Nullable Object object) {
- if (object == null) {
- return null;
- }
- final Hll hll = (Hll) object;
- final double estimate = hll.size();
-
- if (round) {
- return Math.round(estimate);
- } else {
- return estimate;
- }
- }
-
- @Override
- @JsonProperty
- public String getName() {
- return name;
- }
-
- @JsonProperty
- public String getFieldName() {
- return fieldName;
- }
-
- @JsonProperty
- public int getPrecision() {
- return precision;
- }
-
- @JsonProperty
- public boolean isRound() {
- return round;
- }
-
- @Override
- public String getTypeName() {
- return HllModule.HLLD_BUILD_TYPE_NAME;
- }
-
- @Override
- public List requiredFields() {
- return Collections.singletonList(fieldName);
- }
-
- @Override
- public int getMaxIntermediateSize() {
- return Hll.getUpdatableSerializationBytes(precision);
- }
-
- @Override
- public byte[] getCacheKey() {
- return new CacheKeyBuilder(HllModule.CACHE_TYPE_ID_OFFSET).appendByte(HllModule.HLLD_BUILD_CACHE_TYPE_ID)
- .appendString(name).appendString(fieldName)
- .appendInt(precision).appendBoolean(round)
- .build();
- }
-
- @Override
- public boolean equals(final Object o){
- if (this == o) {
- return true;
- }
- if (o == null || !getClass().equals(o.getClass())) {
- return false;
- }
-
- HllAggregatorFactory that = (HllAggregatorFactory) o;
- return name.equals(that.name) && fieldName.equals(that.fieldName) &&
- precision == that.precision &&
- round == that.round
- ;
- }
-
- @Override
- public int hashCode(){
- return Objects.hash(name, fieldName, precision, round);
- }
-
-
- @Override
- public String toString() {
- return getClass().getSimpleName() + "{" +
- "name='" + name + '\'' +
- ", fieldName='" + fieldName + '\'' +
- ", precision=" + precision +
- ", round=" + round +
- '}';
- }
-}
+package org.apache.druid.query.aggregation.sketch.hlld;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.zdjz.galaxy.sketch.hlld.Hll;
+import com.zdjz.galaxy.sketch.hlld.HllUnion;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.aggregation.*;
+import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.column.ColumnType;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+public class HllAggregatorFactory extends AggregatorFactory {
+ private static final Logger LOG = new Logger(HllAggregatorFactory.class);
+ public static final boolean DEFAULT_ROUND = false;
+ public static final int DEFAULT_PRECISION = 12;
+
+ static final Comparator COMPARATOR = Comparator.nullsFirst(Comparator.comparingDouble(Hll::size));
+
+ protected final String name;
+ protected final String fieldName;
+ protected final int precision;
+ protected final boolean round;
+ protected final int updatableSerializationBytes;
+
+ public HllAggregatorFactory(
+ @JsonProperty("name") final String name,
+ @JsonProperty("fieldName") final String fieldName,
+ @JsonProperty("precision") @Nullable final Integer precision,
+ @JsonProperty("round") @Nullable final Boolean round
+ ) {
+ if (name == null) {
+ throw new IAE("Must have a valid, non-null aggregator name");
+ }
+ if (fieldName == null) {
+ throw new IAE("Parameter fieldName must be specified");
+ }
+ this.name = name;
+ this.fieldName = fieldName;
+ this.precision = precision == null ? DEFAULT_PRECISION : precision;
+ this.round = round == null ? DEFAULT_ROUND : round;
+ this.updatableSerializationBytes = getUpdatableSerializationBytes();
+ }
+
+ @Override
+ public Aggregator factorize(ColumnSelectorFactory columnSelectorFactory) {
+ final ColumnValueSelector selector = columnSelectorFactory.makeColumnValueSelector(fieldName);
+ return new HllAggregator(selector, precision);
+ }
+
+ @Override
+ public BufferAggregator factorizeBuffered(ColumnSelectorFactory columnSelectorFactory) {
+ final ColumnValueSelector selector = columnSelectorFactory.makeColumnValueSelector(fieldName);
+ return new HllBufferAggregator(
+ selector,
+ precision
+ );
+ }
+
+ @Override
+ public Comparator getComparator() {
+ return COMPARATOR;
+ }
+
+ @Override
+ public Object combine(Object lhs, Object rhs) {
+ if(lhs == null){
+ return rhs;
+ }else if(rhs == null){
+ return lhs;
+ }else{
+ final HllUnion union = new HllUnion(precision);
+ union.update((Hll) lhs);
+ union.update((Hll) rhs);
+ Hll result = union.getResult();
+ return result;
+ }
+ }
+
+ @Override
+ public AggregateCombiner makeAggregateCombiner() {
+ return new ObjectAggregateCombiner() {
+ private HllUnion union = null;
+
+ @Override
+ public void reset(ColumnValueSelector selector) {
+ //LOG.error("HllAggregateCombiner reset:" + "-" + Thread.currentThread().getId() + "-" + this);
+ //union.reset();
+ union = null;
+ fold(selector);
+ }
+
+ @Override
+ public void fold(ColumnValueSelector selector) {
+ //LOG.error("HllAggregateCombiner fold:" + "-" + Thread.currentThread().getId() + "-" + this);
+ final Hll hll = (Hll) selector.getObject();
+ if(hll != null){
+ if(union == null){
+ union = new HllUnion(precision);
+ }
+ union.update(hll);
+ }else{
+ //LOG.error("HllAggregateCombiner fold_null:" + "-" + Thread.currentThread().getId() + "-" + this);
+ }
+ }
+
+ @Override
+ public Class classOfObject() {
+ return Hll.class;
+ }
+
+ @Nullable
+ @Override
+ public Hll getObject() {
+ //LOG.error("HllAggregateCombiner get:" + "-" + Thread.currentThread().getId() + "-" + this);
+ if(union == null){
+ return null;
+ }else{
+ Hll result = union.getResult();
+ /*if(result.size() == 0){
+ return null;
+ }*/
+ return result;
+ }
+ }
+ };
+ }
+
+ @Override
+ public AggregatorFactory getCombiningFactory() {
+ // 千万不能写错,好大一个坑
+ return new HllMergeAggregatorFactory(name, name, precision, round);
+ }
+
+ @Override
+ public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException {
+ if (other.getName().equals(this.getName()) && other instanceof HllAggregatorFactory) {
+ HllAggregatorFactory castedOther = (HllAggregatorFactory) other;
+
+ return new HllMergeAggregatorFactory(name, name,
+ Math.max(precision, castedOther.precision),
+ round || castedOther.round
+ );
+ }
+
+ throw new AggregatorFactoryNotMergeableException(this, other);
+ }
+
+ @Override
+ public List getRequiredColumns() {
+ return Collections.singletonList(
+ new HllAggregatorFactory(fieldName, fieldName, precision, round)
+ );
+ }
+
+ @Override
+ public AggregatorFactory withName(String newName) {
+ return new HllAggregatorFactory(newName, fieldName, precision, round);
+ }
+
+ @Override
+ public Object deserialize(Object object) {
+ if (object == null) {
+ return null;
+ }
+ return HllUtils.deserializeHll(object);
+ }
+
+ @Override
+ public ColumnType getResultType() {
+ //return round ? ColumnType.LONG : ColumnType.DOUBLE;
+ return getIntermediateType();
+ }
+
+ @Nullable
+ @Override
+ public Object finalizeComputation(@Nullable Object object) {
+ if (object == null) {
+ return null;
+ }
+
+ return object;
+
+ /*final Hll hll = (Hll) object;
+ final double estimate = hll.size();
+
+ if (round) {
+ return Math.round(estimate);
+ } else {
+ return estimate;
+ }*/
+ }
+
+ @Override
+ @JsonProperty
+ public String getName() {
+ return name;
+ }
+
+ @JsonProperty
+ public String getFieldName() {
+ return fieldName;
+ }
+
+ @JsonProperty
+ public int getPrecision() {
+ return precision;
+ }
+
+ @JsonProperty
+ public boolean isRound() {
+ return round;
+ }
+
+ /*
+ 没这个方法了, 新版本需要实现getIntermediateType方法
+ @Override
+ public String getTypeName() {
+ return HllModule.HLLD_BUILD_TYPE_NAME;
+ }*/
+
+ @Override
+ public ColumnType getIntermediateType() {
+ return HllModule.BUILD_TYPE;
+ }
+
+ @Override
+ public List requiredFields() {
+ return Collections.singletonList(fieldName);
+ }
+
+ @Override
+ public int getMaxIntermediateSize() {
+ return updatableSerializationBytes == 0? getUpdatableSerializationBytes():updatableSerializationBytes;
+ }
+
+ protected int getUpdatableSerializationBytes(){
+ return Hll.getUpdatableSerializationBytes(precision);
+ }
+
+ @Override
+ public byte[] getCacheKey() {
+ return new CacheKeyBuilder(HllModule.CACHE_TYPE_ID_OFFSET).appendByte(HllModule.HLLD_BUILD_CACHE_TYPE_ID)
+ .appendString(name).appendString(fieldName)
+ .appendInt(precision).appendBoolean(round)
+ .build();
+ }
+
+ @Override
+ public boolean equals(final Object o){
+ if (this == o) {
+ return true;
+ }
+ if (o == null || !getClass().equals(o.getClass())) {
+ return false;
+ }
+
+ HllAggregatorFactory that = (HllAggregatorFactory) o;
+ return name.equals(that.name) && fieldName.equals(that.fieldName) &&
+ precision == that.precision &&
+ round == that.round
+ ;
+ }
+
+ @Override
+ public int hashCode(){
+ return Objects.hash(name, fieldName, precision, round);
+ }
+
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "{" +
+ "name='" + name + '\'' +
+ ", fieldName='" + fieldName + '\'' +
+ ", precision=" + precision +
+ ", round=" + round +
+ '}';
+ }
+}
diff --git a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllMergeAggregatorFactory.java b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllMergeAggregatorFactory.java
index 6a80f0c..81491f7 100644
--- a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllMergeAggregatorFactory.java
+++ b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllMergeAggregatorFactory.java
@@ -1,59 +1,73 @@
-package org.apache.druid.query.aggregation.sketch.hlld;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.zdjz.galaxy.sketch.hlld.Hll;
-import com.zdjz.galaxy.sketch.hlld.HllUnion;
-import org.apache.druid.query.aggregation.Aggregator;
-import org.apache.druid.query.aggregation.BufferAggregator;
-import org.apache.druid.query.cache.CacheKeyBuilder;
-import org.apache.druid.segment.ColumnSelectorFactory;
-import org.apache.druid.segment.ColumnValueSelector;
-
-import javax.annotation.Nullable;
-
-public class HllMergeAggregatorFactory extends HllAggregatorFactory{
- public HllMergeAggregatorFactory(
- @JsonProperty("name") final String name,
- @JsonProperty("fieldName") final String fieldName,
- @JsonProperty("precision") @Nullable final Integer precision,
- @JsonProperty("round") @Nullable final Boolean round
- ) {
- super(name, fieldName, precision, round);
- }
-
- @Override
- public String getTypeName(){
- return HllModule.HLLD_TYPE_NAME;
- }
-
- @Override
- public Aggregator factorize(ColumnSelectorFactory metricFactory) {
- final ColumnValueSelector selector = metricFactory.makeColumnValueSelector(getFieldName());
- return new HllMergeAggregator(
- selector,
- precision
- );
- }
-
- @Override
- public BufferAggregator factorizeBuffered(ColumnSelectorFactory columnSelectorFactory) {
- final ColumnValueSelector selector = columnSelectorFactory.makeColumnValueSelector(getFieldName());
- return new HllMergeBufferAggregator(
- selector,
- precision
- );
- }
-
- @Override
- public byte[] getCacheKey() {
- return new CacheKeyBuilder(HllModule.CACHE_TYPE_ID_OFFSET).appendByte(HllModule.HLLD_MERGE_CACHE_TYPE_ID)
- .appendString(name).appendString(fieldName)
- .appendInt(precision).appendBoolean(round)
- .build();
- }
-
- @Override
- public int getMaxIntermediateSize() {
- return HllUnion.getUpdatableSerializationBytes(precision);
- }
-}
+package org.apache.druid.query.aggregation.sketch.hlld;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.zdjz.galaxy.sketch.hlld.Hll;
+import com.zdjz.galaxy.sketch.hlld.HllUnion;
+import org.apache.druid.query.aggregation.Aggregator;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.BufferAggregator;
+import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.column.ColumnType;
+
+import javax.annotation.Nullable;
+
+public class HllMergeAggregatorFactory extends HllAggregatorFactory{
+ public HllMergeAggregatorFactory(
+ @JsonProperty("name") final String name,
+ @JsonProperty("fieldName") final String fieldName,
+ @JsonProperty("precision") @Nullable final Integer precision,
+ @JsonProperty("round") @Nullable final Boolean round
+ ) {
+ super(name, fieldName, precision, round);
+ }
+
+ /*
+ 没这个方法了, 新版本需要实现getIntermediateType方法
+ @Override
+ public String getTypeName(){
+ return HllModule.HLLD_TYPE_NAME;
+ }*/
+
+ @Override
+ public ColumnType getIntermediateType() {
+ return HllModule.TYPE;
+ }
+
+ @Override
+ public Aggregator factorize(ColumnSelectorFactory metricFactory) {
+ final ColumnValueSelector selector = metricFactory.makeColumnValueSelector(getFieldName());
+ return new HllMergeAggregator(
+ selector,
+ precision
+ );
+ }
+
+ @Override
+ public BufferAggregator factorizeBuffered(ColumnSelectorFactory columnSelectorFactory) {
+ final ColumnValueSelector selector = columnSelectorFactory.makeColumnValueSelector(getFieldName());
+ return new HllMergeBufferAggregator(
+ selector,
+ precision
+ );
+ }
+
+ @Override
+ public AggregatorFactory withName(String newName) {
+ return new HllMergeAggregatorFactory(newName, fieldName, precision, round);
+ }
+
+ @Override
+ public byte[] getCacheKey() {
+ return new CacheKeyBuilder(HllModule.CACHE_TYPE_ID_OFFSET).appendByte(HllModule.HLLD_MERGE_CACHE_TYPE_ID)
+ .appendString(name).appendString(fieldName)
+ .appendInt(precision).appendBoolean(round)
+ .build();
+ }
+
+ @Override
+ protected int getUpdatableSerializationBytes() {
+ return HllUnion.getUpdatableSerializationBytes(precision);
+ }
+}
diff --git a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllModule.java b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllModule.java
index 49879c4..7982dcf 100644
--- a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllModule.java
+++ b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllModule.java
@@ -10,6 +10,7 @@ import org.apache.druid.initialization.DruidModule;
import org.apache.druid.query.aggregation.sketch.hlld.sql.HllApproxCountDistinctSqlAggregator;
import org.apache.druid.query.aggregation.sketch.hlld.sql.HllEstimateOperatorConversion;
import org.apache.druid.query.aggregation.sketch.hlld.sql.HllObjectSqlAggregator;
+import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.sql.guice.SqlBindings;
@@ -24,6 +25,9 @@ public class HllModule implements DruidModule {
public static final String HLLD_TYPE_NAME = "HLLDSketch";
public static final String HLLD_BUILD_TYPE_NAME = "HLLDSketchBuild";
+ public static final ColumnType TYPE = ColumnType.ofComplex(HLLD_TYPE_NAME);
+ public static final ColumnType BUILD_TYPE = ColumnType.ofComplex(HLLD_BUILD_TYPE_NAME);
+
@Override
public void configure(Binder binder) {
diff --git a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllToEstimatePostAggregator.java b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllToEstimatePostAggregator.java
index 8f4a949..ac4b10f 100644
--- a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllToEstimatePostAggregator.java
+++ b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllToEstimatePostAggregator.java
@@ -1,103 +1,114 @@
-package org.apache.druid.query.aggregation.sketch.hlld;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.zdjz.galaxy.sketch.hlld.Hll;
-import org.apache.druid.query.aggregation.AggregatorFactory;
-import org.apache.druid.query.aggregation.PostAggregator;
-import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
-import org.apache.druid.query.cache.CacheKeyBuilder;
-
-import java.util.Comparator;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-
-public class HllToEstimatePostAggregator implements PostAggregator {
- private final String name;
- private final PostAggregator field;
- private final boolean round;
-
- @JsonCreator
- public HllToEstimatePostAggregator(
- @JsonProperty("name") final String name,
- @JsonProperty("field") final PostAggregator field,
- @JsonProperty("round") boolean round
- ) {
- this.name = name;
- this.field = field;
- this.round = round;
- }
-
- @Override
- @JsonProperty
- public String getName() {
- return name;
- }
-
- @JsonProperty
- public PostAggregator getField() {
- return field;
- }
-
- @JsonProperty
- public boolean isRound() {
- return round;
- }
-
- @Override
- public Set getDependentFields() {
- return field.getDependentFields();
- }
-
- @Override
- public Comparator getComparator() {
- return ArithmeticPostAggregator.DEFAULT_COMPARATOR;
- }
-
- @Override
- public Object compute(final Map combinedAggregators) {
- final Hll sketch = (Hll) field.compute(combinedAggregators);
- return round ? Math.round(sketch.size()) : sketch.size();
- }
-
- @Override
- public PostAggregator decorate(final Map aggregators) {
- return this;
- }
-
- @Override
- public String toString() {
- return "HllToEstimatePostAggregator{" +
- "name='" + name + '\'' +
- ", field=" + field +
- ", round=" + round +
- '}';
- }
-
- @Override
- public boolean equals(final Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof HllToEstimatePostAggregator)) {
- return false;
- }
-
- final HllToEstimatePostAggregator that = (HllToEstimatePostAggregator) o;
- return name.equals(that.name) && field.equals(that.field) && round == that.round;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(name, field, round);
- }
-
- @Override
- public byte[] getCacheKey() {
- CacheKeyBuilder builder = new CacheKeyBuilder(HllModule.CACHE_TYPE_ID_OFFSET).appendByte(HllModule.HLLD_TO_ESTIMATE_CACHE_TYPE_ID)
- .appendCacheable(field).appendBoolean(round);
- return builder.build();
- }
-
-}
+package org.apache.druid.query.aggregation.sketch.hlld;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.zdjz.galaxy.sketch.hlld.Hll;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.PostAggregator;
+import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
+import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.column.ColumnType;
+
+import java.util.Comparator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+public class HllToEstimatePostAggregator implements PostAggregator {
+ private final String name;
+ private final PostAggregator field;
+ private final boolean round;
+
+ @JsonCreator
+ public HllToEstimatePostAggregator(
+ @JsonProperty("name") final String name,
+ @JsonProperty("field") final PostAggregator field,
+ @JsonProperty("round") boolean round
+ ) {
+ this.name = name;
+ this.field = field;
+ this.round = round;
+ }
+
+ // 新版本需要实现的方法
+ @Override
+ public ColumnType getType(ColumnInspector signature) {
+ return round ? ColumnType.LONG : ColumnType.DOUBLE;
+ }
+
+ @Override
+ @JsonProperty
+ public String getName() {
+ return name;
+ }
+
+ @JsonProperty
+ public PostAggregator getField() {
+ return field;
+ }
+
+ @JsonProperty
+ public boolean isRound() {
+ return round;
+ }
+
+ @Override
+ public Set getDependentFields() {
+ return field.getDependentFields();
+ }
+
+ @Override
+ public Comparator getComparator() {
+ return ArithmeticPostAggregator.DEFAULT_COMPARATOR;
+ }
+
+ @Override
+ public Object compute(final Map combinedAggregators) {
+ final Hll sketch = (Hll) field.compute(combinedAggregators);
+ if(sketch == null){
+ return round ? 0L: 0D;
+ }
+ return round ? Math.round(sketch.size()) : sketch.size();
+ }
+
+ @Override
+ public PostAggregator decorate(final Map aggregators) {
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return "HllToEstimatePostAggregator{" +
+ "name='" + name + '\'' +
+ ", field=" + field +
+ ", round=" + round +
+ '}';
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof HllToEstimatePostAggregator)) {
+ return false;
+ }
+
+ final HllToEstimatePostAggregator that = (HllToEstimatePostAggregator) o;
+ return name.equals(that.name) && field.equals(that.field) && round == that.round;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, field, round);
+ }
+
+ @Override
+ public byte[] getCacheKey() {
+ CacheKeyBuilder builder = new CacheKeyBuilder(HllModule.CACHE_TYPE_ID_OFFSET).appendByte(HllModule.HLLD_TO_ESTIMATE_CACHE_TYPE_ID)
+ .appendCacheable(field).appendBoolean(round);
+ return builder.build();
+ }
+
+}
diff --git a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllApproxCountDistinctSqlAggregator.java b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllApproxCountDistinctSqlAggregator.java
index 4971063..c35b087 100644
--- a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllApproxCountDistinctSqlAggregator.java
+++ b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllApproxCountDistinctSqlAggregator.java
@@ -5,36 +5,44 @@ import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.type.*;
import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.apache.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator;
-import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.query.aggregation.sketch.hlld.HllAggregatorFactory;
+import org.apache.druid.query.aggregation.sketch.hlld.HllToEstimatePostAggregator;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import java.util.Collections;
-import java.util.List;
public class HllApproxCountDistinctSqlAggregator extends HllBaseSqlAggregator {
private static final SqlAggFunction FUNCTION_INSTANCE = new CPCSketchApproxCountDistinctSqlAggFunction();
private static final String NAME = "APPROX_COUNT_DISTINCT_HLLD";
+ public HllApproxCountDistinctSqlAggregator(){
+ super(true);
+ }
+
@Override
public SqlAggFunction calciteFunction() {
return FUNCTION_INSTANCE;
}
+ // 新版本参数少了virtualColumns
@Override
protected Aggregation toAggregation(
String name,
boolean finalizeAggregations,
- List virtualColumns,
AggregatorFactory aggregatorFactory
) {
return Aggregation.create(
- virtualColumns,
Collections.singletonList(aggregatorFactory),
//感觉是否是最外层的函数吧
- finalizeAggregations ? new FinalizingFieldAccessPostAggregator(
+ finalizeAggregations ? new HllToEstimatePostAggregator(
name,
- aggregatorFactory.getName()
+ new FieldAccessPostAggregator(
+ aggregatorFactory.getName(),
+ aggregatorFactory.getName()
+ ),
+ ((HllAggregatorFactory)aggregatorFactory).isRound()
) : null
);
}
diff --git a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllBaseSqlAggregator.java b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllBaseSqlAggregator.java
index 4bdcf82..a065a4e 100644
--- a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllBaseSqlAggregator.java
+++ b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllBaseSqlAggregator.java
@@ -2,6 +2,7 @@ package org.apache.druid.query.aggregation.sketch.hlld.sql;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
@@ -14,6 +15,7 @@ import org.apache.druid.query.aggregation.sketch.hlld.HllMergeAggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
@@ -29,6 +31,13 @@ import java.util.ArrayList;
import java.util.List;
public abstract class HllBaseSqlAggregator implements SqlAggregator {
+
+ private final boolean finalizeSketch;
+
+ protected HllBaseSqlAggregator(boolean finalizeSketch){
+ this.finalizeSketch = finalizeSketch;
+ }
+
@Nullable
@Override
public Aggregation toDruidAggregation(
@@ -93,13 +102,14 @@ public abstract class HllBaseSqlAggregator implements SqlAggregator {
round = HllAggregatorFactory.DEFAULT_ROUND;
}
- final List virtualColumns = new ArrayList<>();
+ // 新版本删除了final List virtualColumns = new ArrayList<>();
final AggregatorFactory aggregatorFactory;
- final String aggregatorName = finalizeAggregations ? Calcites.makePrefixedName(name, "a") : name;
+ //final String aggregatorName = finalizeAggregations ? Calcites.makePrefixedName(name, "a") : name;
+ final String aggregatorName = finalizeSketch ? Calcites.makePrefixedName(name, "a") : name;
- // 输入是Cpc,返回HllMergeAggregatorFactory
+ // 输入是Hll,返回HllSketchMergeAggregatorFactory
if (columnArg.isDirectColumnAccess()
- && rowSignature.getColumnType(columnArg.getDirectColumn()).orElse(null) == ValueType.COMPLEX) {
+ && rowSignature.getColumnType(columnArg.getDirectColumn()).map(type -> type.is(ValueType.COMPLEX)).orElse(false)) {
// 这就是具体的聚合函数吧
aggregatorFactory = new HllMergeAggregatorFactory(
aggregatorName,
@@ -109,10 +119,10 @@ public abstract class HllBaseSqlAggregator implements SqlAggregator {
);
} else {
// 输入是regular column,HllBuildAggregatorFactory
- final SqlTypeName sqlTypeName = columnRexNode.getType().getSqlTypeName();
- final ValueType inputType = Calcites.getValueTypeForSqlTypeName(sqlTypeName);
+ final RelDataType dataType = columnRexNode.getType();
+ final ColumnType inputType = Calcites.getColumnTypeForRelDataType(dataType);
if (inputType == null) {
- throw new ISE("Cannot translate sqlTypeName[%s] to Druid type for field[%s]", sqlTypeName, aggregatorName);
+ throw new ISE("Cannot translate sqlTypeName[%s] to Druid type for field[%s]", dataType.getSqlTypeName(), aggregatorName);
}
final DimensionSpec dimensionSpec;
@@ -120,27 +130,34 @@ public abstract class HllBaseSqlAggregator implements SqlAggregator {
if (columnArg.isDirectColumnAccess()) {
dimensionSpec = columnArg.getSimpleExtraction().toDimensionSpec(null, inputType);
} else {
- VirtualColumn virtualColumn = virtualColumnRegistry.getOrCreateVirtualColumnForExpression(
- plannerContext,
+ String virtualColumnName = virtualColumnRegistry.getOrCreateVirtualColumnForExpression(
columnArg,
- sqlTypeName
+ dataType
);
- dimensionSpec = new DefaultDimensionSpec(virtualColumn.getOutputName(), null, inputType);
- virtualColumns.add(virtualColumn);
+ dimensionSpec = new DefaultDimensionSpec(virtualColumnName, null, inputType);
}
- aggregatorFactory = new HllAggregatorFactory(
- aggregatorName,
- dimensionSpec.getDimension(),
- precision,
- round
- );
+ // 新版本的判断,输入是Hll
+ if (inputType.is(ValueType.COMPLEX)) {
+ aggregatorFactory = new HllMergeAggregatorFactory(
+ aggregatorName,
+ dimensionSpec.getOutputName(),
+ precision,
+ round
+ );
+ } else {
+ aggregatorFactory = new HllAggregatorFactory(
+ aggregatorName,
+ dimensionSpec.getDimension(),
+ precision,
+ round
+ );
+ }
}
return toAggregation(
name,
- finalizeAggregations,
- virtualColumns,
+ finalizeSketch,
aggregatorFactory
);
}
@@ -148,7 +165,6 @@ public abstract class HllBaseSqlAggregator implements SqlAggregator {
protected abstract Aggregation toAggregation(
String name,
boolean finalizeAggregations,
- List virtualColumns,
AggregatorFactory aggregatorFactory
);
}
diff --git a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllEstimateOperatorConversion.java b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllEstimateOperatorConversion.java
index 071d41b..41e38cb 100644
--- a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllEstimateOperatorConversion.java
+++ b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllEstimateOperatorConversion.java
@@ -13,16 +13,15 @@ import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.sketch.hlld.HllAggregatorFactory;
import org.apache.druid.query.aggregation.sketch.hlld.HllToEstimatePostAggregator;
import org.apache.druid.segment.column.RowSignature;
-import org.apache.druid.sql.calcite.expression.DirectOperatorConversion;
-import org.apache.druid.sql.calcite.expression.DruidExpression;
-import org.apache.druid.sql.calcite.expression.OperatorConversions;
-import org.apache.druid.sql.calcite.expression.PostAggregatorVisitor;
+import org.apache.druid.sql.calcite.expression.*;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import javax.annotation.Nullable;
import java.util.List;
-public class HllEstimateOperatorConversion extends DirectOperatorConversion {
+// postAggregator, toDruidExpression返回null。相当于post udf和普通udf是不一样的。
+// 新版本直接修改了父类
+public class HllEstimateOperatorConversion implements SqlOperatorConversion {
private static final String FUNCTION_NAME = "HLLD_ESTIMATE";
private static final SqlFunction SQL_FUNCTION = OperatorConversions
.operatorBuilder(StringUtils.toUpperCase(FUNCTION_NAME))
@@ -32,9 +31,7 @@ public class HllEstimateOperatorConversion extends DirectOperatorConversion {
.returnTypeInference(ReturnTypes.DOUBLE)
.build();
- public HllEstimateOperatorConversion() {
- super(SQL_FUNCTION, FUNCTION_NAME);
- }
+ // 新版本少了构造函数
@Override
public SqlOperator calciteOperator() {
@@ -63,7 +60,8 @@ public class HllEstimateOperatorConversion extends DirectOperatorConversion {
plannerContext,
rowSignature,
operands.get(0),
- postAggregatorVisitor
+ postAggregatorVisitor,
+ true // 新版本多了个参数
);
if (firstOperand == null) {
diff --git a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllObjectSqlAggregator.java b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllObjectSqlAggregator.java
index 58bbd45..f0e7da6 100644
--- a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllObjectSqlAggregator.java
+++ b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllObjectSqlAggregator.java
@@ -5,16 +5,18 @@ import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.type.*;
import org.apache.druid.query.aggregation.AggregatorFactory;
-import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import java.util.Collections;
-import java.util.List;
public class HllObjectSqlAggregator extends HllBaseSqlAggregator {
private static final SqlAggFunction FUNCTION_INSTANCE = new CpcSketchSqlAggFunction();
private static final String NAME = "HLLD";
+ public HllObjectSqlAggregator(){
+ super(false);
+ }
+
@Override
public SqlAggFunction calciteFunction() {
return FUNCTION_INSTANCE;
@@ -24,11 +26,9 @@ public class HllObjectSqlAggregator extends HllBaseSqlAggregator {
protected Aggregation toAggregation(
String name,
boolean finalizeAggregations,
- List virtualColumns,
AggregatorFactory aggregatorFactory
) {
return Aggregation.create(
- virtualColumns,
Collections.singletonList(aggregatorFactory),
null
);
diff --git a/druid-hlld/src/test/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllApproxCountDistinctSqlAggregatorTest.java b/druid-hlld/src/test/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllApproxCountDistinctSqlAggregatorTest.java
index 6a9f3a1..a28d14f 100644
--- a/druid-hlld/src/test/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllApproxCountDistinctSqlAggregatorTest.java
+++ b/druid-hlld/src/test/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllApproxCountDistinctSqlAggregatorTest.java
@@ -1,311 +1,429 @@
-package org.apache.druid.query.aggregation.sketch.hlld.sql;
-
-
-import com.fasterxml.jackson.databind.Module;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.druid.java.util.common.io.Closer;
-import org.apache.druid.query.QueryRunnerFactoryConglomerate;
-import org.apache.druid.query.aggregation.CountAggregatorFactory;
-import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
-import org.apache.druid.query.aggregation.sketch.hlld.HllAggregatorFactory;
-import org.apache.druid.query.aggregation.sketch.hlld.HllModule;
-import org.apache.druid.segment.IndexBuilder;
-import org.apache.druid.segment.QueryableIndex;
-import org.apache.druid.segment.TestHelper;
-import org.apache.druid.segment.incremental.IncrementalIndexSchema;
-import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
-import org.apache.druid.server.QueryStackTests;
-import org.apache.druid.server.security.AuthTestUtils;
-import org.apache.druid.server.security.AuthenticationResult;
-import org.apache.druid.sql.SqlLifecycle;
-import org.apache.druid.sql.SqlLifecycleFactory;
-import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
-import org.apache.druid.sql.calcite.planner.PlannerConfig;
-import org.apache.druid.sql.calcite.planner.PlannerContext;
-import org.apache.druid.sql.calcite.planner.PlannerFactory;
-import org.apache.druid.sql.calcite.util.CalciteTestBase;
-import org.apache.druid.sql.calcite.util.CalciteTests;
-import org.apache.druid.sql.calcite.util.QueryLogHook;
-import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
-import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.partition.LinearShardSpec;
-import org.junit.*;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-public class HllApproxCountDistinctSqlAggregatorTest extends CalciteTestBase {
- private static final String DATA_SOURCE = "foo";
- private static final boolean ROUND = true;
- private static final Map QUERY_CONTEXT_DEFAULT = ImmutableMap.of(
- PlannerContext.CTX_SQL_QUERY_ID, "dummy"
- );
- private static QueryRunnerFactoryConglomerate conglomerate;
- private static Closer resourceCloser;
- private static AuthenticationResult authenticationResult = CalciteTests.REGULAR_USER_AUTH_RESULT;
-
- @Rule
- public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
- @Rule
- public QueryLogHook queryLogHook = QueryLogHook.create(TestHelper.JSON_MAPPER);
-
- private SpecificSegmentsQuerySegmentWalker walker;
- private SqlLifecycleFactory sqlLifecycleFactory;
-
- @BeforeClass
- public static void setUpClass() {
- resourceCloser = Closer.create();
- conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(resourceCloser);
- }
-
- @AfterClass
- public static void tearDownClass() throws IOException {
- resourceCloser.close();
- }
-
- @Before
- public void setUp() throws Exception {
- HllModule.registerSerde();
- for (Module mod : new HllModule().getJacksonModules()) {
- CalciteTests.getJsonMapper().registerModule(mod);
- TestHelper.JSON_MAPPER.registerModule(mod);
- }
-
- final QueryableIndex index = IndexBuilder.create()
- .tmpDir(temporaryFolder.newFolder())
- .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
- .schema(
- new IncrementalIndexSchema.Builder()
- .withMetrics(
- new CountAggregatorFactory("cnt"),
- new DoubleSumAggregatorFactory("m1", "m1"),
- new HllAggregatorFactory(
- "hll_dim1",
- "dim1",
- null,
- ROUND
- )
- )
- .withRollup(false)
- .build()
- )
- .rows(CalciteTests.ROWS1)
- .buildMMappedIndex();
-
- walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add(
- DataSegment.builder()
- .dataSource(DATA_SOURCE)
- .interval(index.getDataInterval())
- .version("1")
- .shardSpec(new LinearShardSpec(0))
- .size(0)
- .build(),
- index
- );
-
- final PlannerConfig plannerConfig = new PlannerConfig();
- final DruidOperatorTable operatorTable = new DruidOperatorTable(
- ImmutableSet.of(
- new HllApproxCountDistinctSqlAggregator(),
- new HllObjectSqlAggregator()
- ),
- ImmutableSet.of(
- new HllEstimateOperatorConversion()
- )
- );
-
- SchemaPlus rootSchema = CalciteTests.createMockRootSchema(conglomerate, walker, plannerConfig, AuthTestUtils.TEST_AUTHORIZER_MAPPER);
- sqlLifecycleFactory = CalciteTests.createSqlLifecycleFactory(
- new PlannerFactory(
- rootSchema,
- CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
- operatorTable,
- CalciteTests.createExprMacroTable(),
- plannerConfig,
- AuthTestUtils.TEST_AUTHORIZER_MAPPER,
- CalciteTests.getJsonMapper(),
- CalciteTests.DRUID_SCHEMA_NAME
- )
- );
- }
-
- @After
- public void tearDown() throws Exception {
- walker.close();
- walker = null;
- }
-
- @Test
- public void testSqlQuery() throws Exception {
- SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
- String sql = "select * from druid.foo";
- final List results =
- sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList();
- for (Object[] result : results) {
- System.out.println(Arrays.toString(result));
- }
- }
-
- @Test
- public void testSqlQuery2() throws Exception {
- SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
- String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)) from druid.foo where dim1 = ''";
- final List results =
- sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList();
- for (Object[] result : results) {
- System.out.println(Arrays.toString(result));
- }
- }
-
- @Test
- public void testAgg() throws Exception {
- SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
-
- final String sql = "SELECT\n"
- + " SUM(cnt),\n"
- + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1)\n"
- + "FROM druid.foo";
-
- final List results =
- sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList();
- for (Object[] result : results) {
- System.out.println(Arrays.toString(result));
- }
-
- }
-
-
- @Test
- public void testDistinct() throws Exception {
- SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
-
- final String sql = "SELECT\n"
- + " SUM(cnt),\n"
- + " APPROX_COUNT_DISTINCT_HLLD(dim2),\n" // uppercase
- + " APPROX_COUNT_DISTINCT_HLLD(dim2) FILTER(WHERE dim2 <> ''),\n" // lowercase; also, filtered
- + " APPROX_COUNT_DISTINCT_HLLD(SUBSTRING(dim2, 1, 1)),\n" // on extractionFn
- + " APPROX_COUNT_DISTINCT_HLLD(SUBSTRING(dim2, 1, 1) || 'x'),\n" // on expression
- + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1, 16),\n" // on native HllSketch column
- + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1)\n" // on native HllSketch column
- + "FROM druid.foo";
-
- final List results =
- sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList();
- for (Object[] result : results) {
- System.out.println(Arrays.toString(result));
- }
-
- }
-
- @Test
- public void testDistinct2() throws Exception {
- SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
-
- final String sql = "SELECT\n"
- + " SUM(cnt),\n"
- + " APPROX_COUNT_DISTINCT_HLLD(dim2),\n"
- + " HLLD(dim2),\n"
- + " HLLD(hll_dim1),\n"
- + " HLLD_ESTIMATE(HLLD(dim2)),\n"
- + " HLLD_ESTIMATE(HLLD(dim2), true),\n"
- + " HLLD_ESTIMATE(HLLD(dim1), true),\n"
- + " HLLD_ESTIMATE(HLLD(hll_dim1)),\n" // on native HllSketch column
- + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1)\n" // on native HllSketch column
- + "FROM druid.foo";
-
- final List results =
- sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList();
- for (Object[] result : results) {
- System.out.println(Arrays.toString(result));
- }
-
- }
-
- @Test
- public void testDistinctDebug() throws Exception {
- SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
-
- final String sql = "SELECT\n"
- + " SUM(cnt),\n"
- + " APPROX_COUNT_DISTINCT_HLLD(dim2)\n"
- + "FROM druid.foo";
-
- final List results =
- sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList();
- for (Object[] result : results) {
- System.out.println(Arrays.toString(result));
- }
-
- }
-
- @Test
- public void testDeser() throws Exception {
- SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
-
- final String sql = "SELECT\n"
- + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1) cnt\n"
- + "FROM druid.foo";
-
- final List results =
- sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList();
- for (Object[] result : results) {
- System.out.println(Arrays.toString(result));
- }
-
- }
-
-
- @Test
- public void testGroupBy() throws Exception {
- SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
-
- final String sql = "SELECT cnt,\n"
- + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1, 14) cnt2\n"
- + "FROM druid.foo group by cnt";
-
- final List results =
- sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList();
- for (Object[] result : results) {
- System.out.println(Arrays.toString(result));
- }
-
- }
-
- @Test
- public void testGroupBy1() throws Exception {
- SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
-
- final String sql = "SELECT __time,\n"
- + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1, 14) cnt\n"
- + "FROM druid.foo group by __time";
-
- final List results =
- sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList();
- for (Object[] result : results) {
- System.out.println(Arrays.toString(result));
- }
-
- }
-
- @Test
- public void testGroupBy2() throws Exception {
- SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
-
- final String sql = "SELECT __time,\n"
- + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1, 14) cnt\n"
- + "FROM druid.foo group by __time order by cnt desc";
-
- final List results =
- sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList();
- for (Object[] result : results) {
- System.out.println(Arrays.toString(result));
- }
-
- }
-}
+package org.apache.druid.query.aggregation.sketch.hlld.sql;
+
+
+import com.alibaba.fastjson2.JSON;
+import com.fasterxml.jackson.databind.Module;
+import com.google.inject.Injector;
+import org.apache.druid.guice.DruidInjectorBuilder;
+import org.apache.druid.query.QueryRunnerFactoryConglomerate;
+import org.apache.druid.query.aggregation.sketch.hlld.HllModule;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.join.JoinableFactoryWrapper;
+import org.apache.druid.sql.calcite.BaseCalciteQueryTest;
+import org.apache.druid.sql.calcite.QueryTestBuilder;
+import org.apache.druid.sql.calcite.QueryTestRunner;
+import org.apache.druid.sql.calcite.util.CalciteTests;
+import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.LinearShardSpec;
+import org.junit.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+
+// 新版本父类直接变了,实现更简单了
+public class HllApproxCountDistinctSqlAggregatorTest extends BaseCalciteQueryTest {
+ private static final boolean ROUND = true;
+
+ @Override
+ public void gatherProperties(Properties properties)
+ {
+ super.gatherProperties(properties);
+ }
+
+ @Override
+ public void configureGuice(DruidInjectorBuilder builder)
+ {
+ super.configureGuice(builder);
+ builder.addModule(new HllModule());
+ }
+
+
+
+ @SuppressWarnings("resource")
+ @Override
+ public SpecificSegmentsQuerySegmentWalker createQuerySegmentWalker(
+ final QueryRunnerFactoryConglomerate conglomerate,
+ final JoinableFactoryWrapper joinableFactory,
+ final Injector injector
+ ) throws IOException
+ {
+ HllModule.registerSerde();
+ for (Module mod : new HllModule().getJacksonModules()) {
+ CalciteTests.getJsonMapper().registerModule(mod);
+ TestHelper.JSON_MAPPER.registerModule(mod);
+ }
+
+ final QueryableIndex index = TestHelper.getTestIndexIO().loadIndex(new File("D:/doc/datas/testIndex-1369101812"));
+ //final QueryableIndex index = TestHelper.getTestIndexIO().loadIndex(new File("D:/doc/datas/9_index"));
+ /*final QueryableIndex index = IndexBuilder.create()
+ .tmpDir(temporaryFolder.newFolder())
+ .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+ .schema(
+ new IncrementalIndexSchema.Builder()
+ .withMetrics(
+ new CountAggregatorFactory("cnt"),
+ new DoubleSumAggregatorFactory("m1", "m1"),
+ new HllAggregatorFactory(
+ "hll_dim1",
+ "dim1",
+ null,
+ ROUND
+ )
+ )
+ .withRollup(false)
+ .build()
+ )
+ .rows(TestDataBuilder.ROWS1)
+ .buildMMappedIndex();*/
+
+ return new SpecificSegmentsQuerySegmentWalker(conglomerate).add(
+ DataSegment.builder()
+ .dataSource(CalciteTests.DATASOURCE1)
+ .interval(index.getDataInterval())
+ .version("1")
+ .shardSpec(new LinearShardSpec(0))
+ .size(0)
+ .build(),
+ index
+ );
+ }
+
+ @Test
+ public void testSqlQuery() throws Exception {
+ // Can't vectorize due to SUBSTRING expression.
+ cannotVectorize();
+
+ String[] columns = new String[]{"__time", "dim1", "dim2", "dim3", "cnt", "hll_dim1", "m1"};
+
+ String sql = "select " + String.join(",", columns) + " from druid.foo";
+ QueryTestBuilder builder = testBuilder().sql(sql);
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List results = queryResults.results;
+ for (Object[] result : results) {
+ Map row = new LinkedHashMap();
+ for (int i = 0; i < result.length; i++) {
+ row.put(columns[i], result[i]);
+ }
+ System.out.println(JSON.toJSONString(row));
+ // System.out.println(Arrays.toString(result));
+ }
+
+ for (int i = 0; i < columns.length; i++) {
+ Object[] values = new Object[results.size()];
+ for (int j = 0; j < results.size(); j++) {
+ values[j] = results.get(j)[i];
+ }
+ System.out.println(columns[i] + ":" + Arrays.toString(values));
+ }
+ }
+
+ @Test
+ public void testSqlQuery11() throws Exception {
+ // Can't vectorize due to SUBSTRING expression.
+ //cannotVectorize();
+
+
+ String sql = "select HLLD(hll_dim1) hll_dim1 from (select hll_dim1 from druid.foo limit 5) t ";
+ //sql = "select HLLD(hll_dim1) hll_dim1 from druid.foo t ";
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();;
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List results = queryResults.results;
+ for (Object[] result : results) {
+ System.out.println(Arrays.toString(result));
+ }
+ }
+
+ @Test
+ public void testSqlQuery12() throws Exception {
+ // Can't vectorize due to SUBSTRING expression.
+ cannotVectorize();
+
+ String sql = "select * from (select * from druid.foo limit 6) t where __time >= '1970-12-15 07:00:28' and __time < '2023-12-15 08:10:28' ";
+ QueryTestBuilder builder = testBuilder().sql(sql);
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List results = queryResults.results;
+ for (Object[] result : results) {
+ System.out.println(Arrays.toString(result));
+ }
+ }
+
+ @Test
+ public void testSqlQuery1() throws Exception {
+ // Can't vectorize due to SUBSTRING expression.
+ cannotVectorize();
+
+ String sql = "select dim1 from druid.foo";
+ QueryTestBuilder builder = testBuilder().sql(sql);
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List results = queryResults.results;
+ for (Object[] result : results) {
+ System.out.println(Arrays.toString(result));
+ }
+ }
+
+ @Test
+ public void testSqlQuery2() throws Exception {
+ //cannotVectorize();
+ //String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)) from druid.foo where dim1 = '1'";
+ // Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Aggregate expressions cannot be nested
+ //String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)), APPROX_COUNT_DISTINCT_HLLD(HLLD(hll_dim1)), HLLD(hll_dim1) from druid.foo";
+ String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)), APPROX_COUNT_DISTINCT_HLLD(hll_dim1), HLLD(hll_dim1) from (select HLLD(hll_dim1) hll_dim1 from druid.foo) t";
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List results = queryResults.results;
+ for (Object[] result : results) {
+ System.out.println(Arrays.toString(result));
+ }
+ }
+
+ @Test
+ public void testSqlQuery3() throws Exception {
+ //cannotVectorize();
+ //String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)) from druid.foo where dim1 = ''";
+ String sql = "select APPROX_COUNT_DISTINCT_HLLD(hll, 12) from (select HLLD(hll_dim1) hll from druid.foo where dim1 = '1') t ";
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List results = queryResults.results;
+ for (Object[] result : results) {
+ System.out.println(Arrays.toString(result));
+ }
+ }
+
+ @Test
+ public void testSqlQuery4() throws Exception {
+ //cannotVectorize();
+ //String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)) from druid.foo where dim1 = ''";
+ String sql = "select APPROX_COUNT_DISTINCT_HLLD(hll, 12) from (select HLLD(hll_dim1) hll from druid.foo where dim1 = '1') t ";
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List results = queryResults.results;
+ for (Object[] result : results) {
+ System.out.println(Arrays.toString(result));
+ }
+ }
+
+ @Test
+ public void testSqlQuery5() throws Exception {
+ //cannotVectorize();
+ //String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)) from druid.foo where dim1 = ''";
+ String sql = "select dim1,APPROX_COUNT_DISTINCT_HLLD(hll, 12) from (select dim1,HLLD(hll_dim1) hll from druid.foo where dim1 = '1' group by dim1) t group by dim1";
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List results = queryResults.results;
+ for (Object[] result : results) {
+ System.out.println(Arrays.toString(result));
+ }
+ }
+
+ @Test
+ public void testSqlQuery6() throws Exception {
+ //cannotVectorize();
+ //String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)) from druid.foo where dim1 = ''";
+ String sql = "select dim1,APPROX_COUNT_DISTINCT_HLLD(hll, 12) from (select dim1,HLLD(dim1) hll from druid.foo where dim1 = '1' group by dim1 limit 10) t group by dim1";
+ //String sql = "select dim1,HLLD_ESTIMATE(HLLD(hll), false) from (select dim1,HLLD(dim1) hll from druid.foo where dim1 = '1' group by dim1 limit 10) t group by dim1";
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List results = queryResults.results;
+ for (Object[] result : results) {
+ System.out.println(Arrays.toString(result));
+ }
+ }
+
+ @Test
+ public void testSqlQuery62() throws Exception {
+ //cannotVectorize();
+ //String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)) from druid.foo where dim1 = ''";
+ String sql = "select dim1,APPROX_COUNT_DISTINCT_HLLD(hll) from (select dim1,HLLD(dim1) hll from druid.foo where dim1 = '1' group by dim1 limit 10) t group by dim1";
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List results = queryResults.results;
+ for (Object[] result : results) {
+ System.out.println(Arrays.toString(result));
+ }
+ }
+
+ @Test
+ public void testSqlQuery7() throws Exception {
+ //cannotVectorize();
+ //String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)) from druid.foo where dim1 = ''";
+ String sql = "select dim1,APPROX_COUNT_DISTINCT_HLLD(hll, 12) from (select dim1,HLLD(dim1) hll from druid.foo where dim1 = '1' group by dim1) t group by dim1 limit 10";
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List results = queryResults.results;
+ for (Object[] result : results) {
+ System.out.println(Arrays.toString(result));
+ }
+ }
+
+ @Test
+ public void testAgg() throws Exception {
+ final String sql = "SELECT\n"
+ + " SUM(cnt),\n"
+ + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1)\n"
+ + "FROM druid.foo";
+
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List results = queryResults.results;
+ for (Object[] result : results) {
+ System.out.println(Arrays.toString(result));
+ }
+
+ }
+
+ @Test
+ public void testDistinct() throws Exception {
+ final String sql = "SELECT\n"
+ + " SUM(cnt),\n"
+ + " APPROX_COUNT_DISTINCT_HLLD(dim2),\n" // uppercase
+ + " APPROX_COUNT_DISTINCT_HLLD(dim2) FILTER(WHERE dim2 <> ''),\n" // lowercase; also, filtered
+ + " APPROX_COUNT_DISTINCT_HLLD(SUBSTRING(dim2, 1, 1)),\n" // on extractionFn
+ + " APPROX_COUNT_DISTINCT_HLLD(SUBSTRING(dim2, 1, 1) || 'x'),\n" // on expression
+ + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1, 16),\n" // on native HllSketch column
+ + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1)\n" // on native HllSketch column
+ + "FROM druid.foo";
+
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List results = queryResults.results;
+ for (Object[] result : results) {
+ System.out.println(Arrays.toString(result));
+ }
+ }
+
+ @Test
+ public void testDistinct2() throws Exception {
+ final String sql = "SELECT\n"
+ + " SUM(cnt),\n"
+ + " APPROX_COUNT_DISTINCT_HLLD(dim2),\n"
+ + " HLLD(dim2),\n"
+ + " HLLD(hll_dim1),\n"
+ + " HLLD_ESTIMATE(HLLD(dim2)),\n"
+ + " HLLD_ESTIMATE(HLLD(dim2), true),\n"
+ + " HLLD_ESTIMATE(HLLD(dim1), true),\n"
+ + " HLLD_ESTIMATE(HLLD(hll_dim1)),\n" // on native HllSketch column
+ + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1)\n" // on native HllSketch column
+ + "FROM druid.foo";
+
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List results = queryResults.results;
+ for (Object[] result : results) {
+ System.out.println(Arrays.toString(result));
+ }
+
+ }
+
+ @Test
+ public void testDistinctDebug2() throws Exception {
+ final String sql = "SELECT\n"
+ + " dim1, dim2\n"
+ + "FROM druid.foo";
+
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List results = queryResults.results;
+ for (Object[] result : results) {
+ System.out.println(Arrays.toString(result));
+ }
+
+ }
+
+ @Test
+ public void testDistinctDebug() throws Exception {
+ final String sql = "SELECT\n"
+ + " SUM(cnt),\n"
+ + " APPROX_COUNT_DISTINCT_HLLD(dim2)\n"
+ + "FROM druid.foo";
+
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List results = queryResults.results;
+ for (Object[] result : results) {
+ System.out.println(Arrays.toString(result));
+ }
+
+ }
+
+ @Test
+ public void testDeser() throws Exception {
+ final String sql = "SELECT\n"
+ + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1) cnt\n"
+ + "FROM druid.foo";
+
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List results = queryResults.results;
+ for (Object[] result : results) {
+ System.out.println(Arrays.toString(result));
+ }
+
+ }
+
+
+ @Test
+ public void testGroupBy() throws Exception {
+ final String sql = "SELECT cnt,\n"
+ + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1, 14) cnt2\n"
+ + "FROM druid.foo group by cnt";
+
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List results = queryResults.results;
+ for (Object[] result : results) {
+ System.out.println(Arrays.toString(result));
+ }
+ }
+
+ @Test
+ public void testGroupBy1() throws Exception {
+ final String sql = "SELECT __time,\n"
+ + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1, 14) cnt\n"
+ + "FROM druid.foo group by __time";
+
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List results = queryResults.results;
+ for (Object[] result : results) {
+ System.out.println(Arrays.toString(result));
+ }
+
+ }
+
+ @Test
+ public void testGroupBy2() throws Exception {
+ final String sql = "SELECT __time,\n"
+ + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1, 14) cnt\n"
+ + "FROM druid.foo group by __time order by cnt desc";
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List results = queryResults.results;
+ for (Object[] result : results) {
+ System.out.println(Arrays.toString(result));
+ }
+
+ }
+}