Merge branch 'druid_26.0.0' into 'main'

Druid 26.0.0

See merge request galaxy/platform/algorithm/druid-extensions!2
This commit is contained in:
李奉超
2024-07-10 01:27:28 +00:00
23 changed files with 2075 additions and 1724 deletions

View File

@@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<groupId>org.apache.druid.extensions</groupId> <groupId>org.apache.druid.extensions</groupId>
<artifactId>druid-hdrhistogram_0.18.1</artifactId> <artifactId>druid-hdrhistogram_26.0.0</artifactId>
<name>druid-hdrhistogram</name> <name>druid-hdrhistogram</name>
<version>1.0-SNAPSHOT</version> <version>1.0-SNAPSHOT</version>
@@ -14,7 +14,7 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.target>1.8</maven.compiler.target>
<druid.version>0.18.1</druid.version> <druid.version>26.0.0</druid.version>
</properties> </properties>
<dependencies> <dependencies>
@@ -45,6 +45,13 @@
</dependency> </dependency>
<!-- Tests --> <!-- Tests -->
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<version>4.3</version>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>org.apache.druid</groupId> <groupId>org.apache.druid</groupId>
<artifactId>druid-processing</artifactId> <artifactId>druid-processing</artifactId>
@@ -54,9 +61,17 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.druid</groupId> <groupId>org.apache.druid</groupId>
<artifactId>druid-benchmarks</artifactId> <artifactId>druid-server</artifactId>
<version>${druid.version}</version> <version>${druid.version}</version>
<scope>test</scope> <scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-sql</artifactId>
<version>${druid.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>

View File

@@ -1,8 +1,6 @@
package org.apache.druid.query.aggregation.sketch.HdrHistogram; package org.apache.druid.query.aggregation.sketch.HdrHistogram;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import org.HdrHistogram.DirectHistogram;
import org.HdrHistogram.Histogram;
import org.HdrHistogram.HistogramSketch; import org.HdrHistogram.HistogramSketch;
import org.HdrHistogram.HistogramUnion; import org.HdrHistogram.HistogramUnion;
import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.IAE;
@@ -10,6 +8,7 @@ import org.apache.druid.query.aggregation.*;
import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.column.ColumnType;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.Collections; import java.util.Collections;
@@ -20,7 +19,7 @@ import java.util.Objects;
public class HdrHistogramAggregatorFactory extends AggregatorFactory { public class HdrHistogramAggregatorFactory extends AggregatorFactory {
public static final long DEFAULT_LOWEST = 1; public static final long DEFAULT_LOWEST = 1;
public static final long DEFAULT_HIGHEST = 2; public static final long DEFAULT_HIGHEST = 2;
public static final int DEFAULT_SIGNIFICANT = 3; public static final int DEFAULT_SIGNIFICANT = 1;
public static final boolean DEFAULT_AUTO_RESIZE = true; public static final boolean DEFAULT_AUTO_RESIZE = true;
public static final long BUFFER_AUTO_RESIZE_HIGHEST = 100000000L * 1000000L; public static final long BUFFER_AUTO_RESIZE_HIGHEST = 100000000L * 1000000L;
public static final Comparator<HistogramSketch> COMPARATOR = public static final Comparator<HistogramSketch> COMPARATOR =
@@ -32,6 +31,7 @@ public class HdrHistogramAggregatorFactory extends AggregatorFactory {
protected final long highestTrackableValue; protected final long highestTrackableValue;
protected final int numberOfSignificantValueDigits; protected final int numberOfSignificantValueDigits;
protected final boolean autoResize; //默认是false protected final boolean autoResize; //默认是false
protected final int updatableSerializationBytes;
public HdrHistogramAggregatorFactory( public HdrHistogramAggregatorFactory(
@JsonProperty("name") String name, @JsonProperty("name") String name,
@@ -81,6 +81,7 @@ public class HdrHistogramAggregatorFactory extends AggregatorFactory {
this.highestTrackableValue = highestTrackableValue; this.highestTrackableValue = highestTrackableValue;
this.numberOfSignificantValueDigits = numberOfSignificantValueDigits; this.numberOfSignificantValueDigits = numberOfSignificantValueDigits;
this.autoResize = autoResize; this.autoResize = autoResize;
this.updatableSerializationBytes = getUpdatableSerializationBytes();
} }
@Override @Override
@@ -208,15 +209,30 @@ public class HdrHistogramAggregatorFactory extends AggregatorFactory {
); );
} }
@Override
public AggregatorFactory withName(String newName) {
return new HdrHistogramAggregatorFactory(newName, fieldName, lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits, autoResize);
}
@Override @Override
public Object deserialize(Object object) { public Object deserialize(Object object) {
if (object == null) {
return null;
}
return HistogramUtils.deserializeHistogram(object); return HistogramUtils.deserializeHistogram(object);
} }
@Override
public ColumnType getResultType() {
//return ColumnType.LONG;
return getIntermediateType();
}
@Nullable @Nullable
@Override @Override
public Object finalizeComputation(@Nullable Object object) { public Object finalizeComputation(@Nullable Object object) {
return object == null ? null : ((HistogramSketch) object).getTotalCount(); //return object == null ? null : ((HistogramSketch) object).getTotalCount();
return object;
} }
@Override @Override
@@ -250,9 +266,16 @@ public class HdrHistogramAggregatorFactory extends AggregatorFactory {
return autoResize; return autoResize;
} }
/*
没这个方法了, 新版本需要实现getIntermediateType方法
@Override @Override
public String getTypeName() { public String getTypeName() {
return HdrHistogramModule.HDRHISTOGRAM_TYPE_NAME; return HdrHistogramModule.HDRHISTOGRAM_TYPE_NAME;
}*/
@Override
public ColumnType getIntermediateType() {
return HdrHistogramModule.TYPE;
} }
@Override @Override
@@ -263,6 +286,10 @@ public class HdrHistogramAggregatorFactory extends AggregatorFactory {
@Override @Override
public int getMaxIntermediateSize() { public int getMaxIntermediateSize() {
return updatableSerializationBytes == 0? getUpdatableSerializationBytes():updatableSerializationBytes;
}
private int getUpdatableSerializationBytes(){
if(!autoResize){ if(!autoResize){
/*Histogram histogram = new Histogram(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits); /*Histogram histogram = new Histogram(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits);
histogram.setAutoResize(autoResize); histogram.setAutoResize(autoResize);

View File

@@ -1,9 +1,9 @@
package org.apache.druid.query.aggregation.sketch.HdrHistogram; package org.apache.druid.query.aggregation.sketch.HdrHistogram;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import org.HdrHistogram.Histogram;
import org.HdrHistogram.HistogramSketch; import org.HdrHistogram.HistogramSketch;
import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnSelectorFactory;
@@ -48,6 +48,11 @@ public class HdrHistogramMergeAggregatorFactory extends HdrHistogramAggregatorFa
); );
} }
@Override
public AggregatorFactory withName(String newName) {
return new HdrHistogramMergeAggregatorFactory(newName, fieldName, lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits, autoResize);
}
@Override @Override
public byte[] getCacheKey() { public byte[] getCacheKey() {
return new CacheKeyBuilder(HdrHistogramModule.CACHE_TYPE_ID_OFFSET).appendByte(HdrHistogramModule.QUANTILES_HDRHISTOGRAM_MERGE_CACHE_TYPE_ID) return new CacheKeyBuilder(HdrHistogramModule.CACHE_TYPE_ID_OFFSET).appendByte(HdrHistogramModule.QUANTILES_HDRHISTOGRAM_MERGE_CACHE_TYPE_ID)

View File

@@ -7,13 +7,13 @@ import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Binder; import com.google.inject.Binder;
import org.HdrHistogram.Histogram;
import org.HdrHistogram.HistogramSketch; import org.HdrHistogram.HistogramSketch;
import org.apache.druid.initialization.DruidModule; import org.apache.druid.initialization.DruidModule;
import org.apache.druid.query.aggregation.sketch.HdrHistogram.sql.HdrHistogramObjectSqlAggregator; import org.apache.druid.query.aggregation.sketch.HdrHistogram.sql.HdrHistogramObjectSqlAggregator;
import org.apache.druid.query.aggregation.sketch.HdrHistogram.sql.HdrHistogramPercentilesOperatorConversion; import org.apache.druid.query.aggregation.sketch.HdrHistogram.sql.HdrHistogramPercentilesOperatorConversion;
import org.apache.druid.query.aggregation.sketch.HdrHistogram.sql.HdrHistogramQuantileSqlAggregator; import org.apache.druid.query.aggregation.sketch.HdrHistogram.sql.HdrHistogramQuantileSqlAggregator;
import org.apache.druid.query.aggregation.sketch.HdrHistogram.sql.HdrHistogramQuantilesOperatorConversion; import org.apache.druid.query.aggregation.sketch.HdrHistogram.sql.HdrHistogramQuantilesOperatorConversion;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.serde.ComplexMetrics; import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.sql.guice.SqlBindings; import org.apache.druid.sql.guice.SqlBindings;
@@ -29,6 +29,7 @@ public class HdrHistogramModule implements DruidModule {
public static final byte QUANTILES_HDRHISTOGRAM_TO_PERCENTILES_CACHE_TYPE_ID = 0x05; public static final byte QUANTILES_HDRHISTOGRAM_TO_PERCENTILES_CACHE_TYPE_ID = 0x05;
public static final String HDRHISTOGRAM_TYPE_NAME = "HdrHistogramSketch"; public static final String HDRHISTOGRAM_TYPE_NAME = "HdrHistogramSketch";
public static final ColumnType TYPE = ColumnType.ofComplex(HDRHISTOGRAM_TYPE_NAME);
public static final ObjectMapper objectMapper = new ObjectMapper(); public static final ObjectMapper objectMapper = new ObjectMapper();

View File

@@ -9,6 +9,8 @@ import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.PostAggregator; import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.column.ColumnType;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.*; import java.util.*;
@@ -29,6 +31,11 @@ public class HdrHistogramToPercentilesPostAggregator implements PostAggregator {
this.percentileTicksPerHalfDistance = percentileTicksPerHalfDistance; this.percentileTicksPerHalfDistance = percentileTicksPerHalfDistance;
} }
@Override
public ColumnType getType(ColumnInspector signature){
return ColumnType.STRING;
}
@Override @Override
@JsonProperty @JsonProperty
public String getName() { public String getName() {
@@ -49,6 +56,9 @@ public class HdrHistogramToPercentilesPostAggregator implements PostAggregator {
@Override @Override
public Object compute(Map<String, Object> values) { public Object compute(Map<String, Object> values) {
HistogramSketch histogram = (HistogramSketch) values.get(fieldName); HistogramSketch histogram = (HistogramSketch) values.get(fieldName);
if(histogram == null){
return "[]"; //"[]"
}
List<Percentile> percentiles = histogram.percentileList(percentileTicksPerHalfDistance); List<Percentile> percentiles = histogram.percentileList(percentileTicksPerHalfDistance);
return HdrHistogramModule.toJson(percentiles); return HdrHistogramModule.toJson(percentiles);
} }

View File

@@ -9,6 +9,8 @@ import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.PostAggregator; import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.column.ColumnType;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.Comparator; import java.util.Comparator;
@@ -36,6 +38,11 @@ public class HdrHistogramToQuantilePostAggregator implements PostAggregator {
} }
} }
@Override
public ColumnType getType(ColumnInspector signature){
return ColumnType.LONG;
}
@Override @Override
public Set<String> getDependentFields() { public Set<String> getDependentFields() {
return Sets.newHashSet(fieldName); return Sets.newHashSet(fieldName);
@@ -55,6 +62,9 @@ public class HdrHistogramToQuantilePostAggregator implements PostAggregator {
@Override @Override
public Object compute(Map<String, Object> values) { public Object compute(Map<String, Object> values) {
HistogramSketch histogram = (HistogramSketch) values.get(fieldName); HistogramSketch histogram = (HistogramSketch) values.get(fieldName);
if(histogram == null){
return null;
}
return histogram.getValueAtPercentile(probability * 100); return histogram.getValueAtPercentile(probability * 100);
} }

View File

@@ -9,6 +9,8 @@ import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.PostAggregator; import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.column.ColumnType;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.*; import java.util.*;
@@ -29,6 +31,11 @@ public class HdrHistogramToQuantilesPostAggregator implements PostAggregator {
this.probabilitys = probabilitys; this.probabilitys = probabilitys;
} }
@Override
public ColumnType getType(ColumnInspector signature){
return ColumnType.LONG_ARRAY;
}
@Override @Override
@JsonProperty @JsonProperty
public String getName() { public String getName() {
@@ -49,7 +56,11 @@ public class HdrHistogramToQuantilesPostAggregator implements PostAggregator {
@Override @Override
public Object compute(Map<String, Object> values) { public Object compute(Map<String, Object> values) {
HistogramSketch histogram = (HistogramSketch) values.get(fieldName); HistogramSketch histogram = (HistogramSketch) values.get(fieldName);
final long[] counts = new long[probabilitys.length]; if(histogram == null){
//return null;
return new Long[probabilitys.length];
}
final Long[] counts = new Long[probabilitys.length];
for (int i = 0; i < probabilitys.length; i++) { for (int i = 0; i < probabilitys.length; i++) {
counts[i] = histogram.getValueAtPercentile(probabilitys[i] * 100); counts[i] = histogram.getValueAtPercentile(probabilitys[i] * 100);
} }

View File

@@ -18,6 +18,7 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.sketch.HdrHistogram.HdrHistogramAggregatorFactory; import org.apache.druid.query.aggregation.sketch.HdrHistogram.HdrHistogramAggregatorFactory;
import org.apache.druid.query.aggregation.sketch.HdrHistogram.HdrHistogramMergeAggregatorFactory; import org.apache.druid.query.aggregation.sketch.HdrHistogram.HdrHistogramMergeAggregatorFactory;
import org.apache.druid.segment.VirtualColumn; import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.column.ValueType;
import org.apache.druid.sql.calcite.aggregation.Aggregation; import org.apache.druid.sql.calcite.aggregation.Aggregation;
@@ -118,11 +119,11 @@ public class HdrHistogramObjectSqlAggregator implements SqlAggregator {
} }
// No existing match found. Create a new one. // No existing match found. Create a new one.
final List<VirtualColumn> virtualColumns = new ArrayList<>(); // 新版本删除了final List<VirtualColumn> virtualColumns = new ArrayList<>();
if (input.isDirectColumnAccess()) { if (input.isDirectColumnAccess()) {
// 参数是Histogram对象 // 参数是Histogram对象
if (rowSignature.getColumnType(input.getDirectColumn()).orElse(null) == ValueType.COMPLEX) { if (rowSignature.getColumnType(input.getDirectColumn()).map(type -> type.is(ValueType.COMPLEX)).orElse(false)) {
aggregatorFactory = new HdrHistogramMergeAggregatorFactory( aggregatorFactory = new HdrHistogramMergeAggregatorFactory(
histogramName, histogramName,
input.getDirectColumn(), input.getDirectColumn(),
@@ -142,12 +143,11 @@ public class HdrHistogramObjectSqlAggregator implements SqlAggregator {
); );
} }
} else { } else {
final VirtualColumn virtualColumn = final String virtualColumnName =
virtualColumnRegistry.getOrCreateVirtualColumnForExpression(plannerContext, input, SqlTypeName.BIGINT); virtualColumnRegistry.getOrCreateVirtualColumnForExpression(input, ColumnType.LONG);
virtualColumns.add(virtualColumn);
aggregatorFactory = new HdrHistogramAggregatorFactory( aggregatorFactory = new HdrHistogramAggregatorFactory(
histogramName, histogramName,
virtualColumn.getOutputName(), virtualColumnName,
lowestDiscernibleValue, lowestDiscernibleValue,
highestTrackableValue, highestTrackableValue,
numberOfSignificantValueDigits, numberOfSignificantValueDigits,
@@ -156,7 +156,6 @@ public class HdrHistogramObjectSqlAggregator implements SqlAggregator {
} }
return Aggregation.create( return Aggregation.create(
virtualColumns,
ImmutableList.of(aggregatorFactory), ImmutableList.of(aggregatorFactory),
null null
); );

View File

@@ -14,16 +14,16 @@ import org.apache.druid.query.aggregation.sketch.HdrHistogram.HdrHistogramToPerc
import org.apache.druid.query.aggregation.PostAggregator; import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.expression.DirectOperatorConversion;
import org.apache.druid.sql.calcite.expression.DruidExpression; import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.OperatorConversions; import org.apache.druid.sql.calcite.expression.OperatorConversions;
import org.apache.druid.sql.calcite.expression.PostAggregatorVisitor; import org.apache.druid.sql.calcite.expression.PostAggregatorVisitor;
import org.apache.druid.sql.calcite.expression.SqlOperatorConversion;
import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.planner.PlannerContext;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.List; import java.util.List;
public class HdrHistogramPercentilesOperatorConversion extends DirectOperatorConversion { public class HdrHistogramPercentilesOperatorConversion implements SqlOperatorConversion {
private static final String FUNCTION_NAME = "HDR_GET_PERCENTILES"; private static final String FUNCTION_NAME = "HDR_GET_PERCENTILES";
private static final SqlFunction SQL_FUNCTION = OperatorConversions private static final SqlFunction SQL_FUNCTION = OperatorConversions
.operatorBuilder(StringUtils.toUpperCase(FUNCTION_NAME)) .operatorBuilder(StringUtils.toUpperCase(FUNCTION_NAME))
@@ -32,10 +32,6 @@ public class HdrHistogramPercentilesOperatorConversion extends DirectOperatorCon
.returnTypeInference(ReturnTypes.explicit(SqlTypeName.VARCHAR)) .returnTypeInference(ReturnTypes.explicit(SqlTypeName.VARCHAR))
.build(); .build();
public HdrHistogramPercentilesOperatorConversion() {
super(SQL_FUNCTION, FUNCTION_NAME);
}
@Override @Override
public SqlOperator calciteOperator() public SqlOperator calciteOperator()
{ {
@@ -66,7 +62,8 @@ public class HdrHistogramPercentilesOperatorConversion extends DirectOperatorCon
plannerContext, plannerContext,
rowSignature, rowSignature,
operands.get(0), operands.get(0),
postAggregatorVisitor postAggregatorVisitor,
true
); );
if (postAgg == null) { if (postAgg == null) {

View File

@@ -16,6 +16,7 @@ import org.apache.druid.query.aggregation.sketch.HdrHistogram.HdrHistogramAggreg
import org.apache.druid.query.aggregation.sketch.HdrHistogram.HdrHistogramMergeAggregatorFactory; import org.apache.druid.query.aggregation.sketch.HdrHistogram.HdrHistogramMergeAggregatorFactory;
import org.apache.druid.query.aggregation.sketch.HdrHistogram.HdrHistogramToQuantilePostAggregator; import org.apache.druid.query.aggregation.sketch.HdrHistogram.HdrHistogramToQuantilePostAggregator;
import org.apache.druid.segment.VirtualColumn; import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn; import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
@@ -141,22 +142,16 @@ public class HdrHistogramQuantileSqlAggregator implements SqlAggregator {
// Check input for equivalence. // Check input for equivalence.
final boolean inputMatches; final boolean inputMatches;
final VirtualColumn virtualInput = existing.getVirtualColumns() final DruidExpression virtualInput =
virtualColumnRegistry.findVirtualColumnExpressions(theFactory.requiredFields())
.stream() .stream()
.filter(
virtualColumn ->
virtualColumn.getOutputName()
.equals(theFactory.getFieldName())
)
.findFirst() .findFirst()
.orElse(null); .orElse(null);
if (virtualInput == null) { if (virtualInput == null) {
inputMatches = input.isDirectColumnAccess() inputMatches = input.isDirectColumnAccess() && input.getDirectColumn().equals(theFactory.getFieldName());
&& input.getDirectColumn().equals(theFactory.getFieldName());
} else { } else {
inputMatches = ((ExpressionVirtualColumn) virtualInput).getExpression() inputMatches = virtualInput.equals(input);
.equals(input.getExpression());
} }
final boolean matches = inputMatches final boolean matches = inputMatches
@@ -177,11 +172,11 @@ public class HdrHistogramQuantileSqlAggregator implements SqlAggregator {
} }
// No existing match found. Create a new one. // No existing match found. Create a new one.
final List<VirtualColumn> virtualColumns = new ArrayList<>(); //final List<VirtualColumn> virtualColumns = new ArrayList<>();
if (input.isDirectColumnAccess()) { if (input.isDirectColumnAccess()) {
// 参数是Histogram对象 // 参数是Histogram对象
if (rowSignature.getColumnType(input.getDirectColumn()).orElse(null) == ValueType.COMPLEX) { if (rowSignature.getColumnType(input.getDirectColumn()).map(type -> type.is(ValueType.COMPLEX)).orElse(false)) {
aggregatorFactory = new HdrHistogramMergeAggregatorFactory( aggregatorFactory = new HdrHistogramMergeAggregatorFactory(
histogramName, histogramName,
input.getDirectColumn(), input.getDirectColumn(),
@@ -201,12 +196,11 @@ public class HdrHistogramQuantileSqlAggregator implements SqlAggregator {
); );
} }
} else { } else {
final VirtualColumn virtualColumn = final String virtualColumnName =
virtualColumnRegistry.getOrCreateVirtualColumnForExpression(plannerContext, input, SqlTypeName.BIGINT); virtualColumnRegistry.getOrCreateVirtualColumnForExpression(input, ColumnType.LONG);
virtualColumns.add(virtualColumn);
aggregatorFactory = new HdrHistogramAggregatorFactory( aggregatorFactory = new HdrHistogramAggregatorFactory(
histogramName, histogramName,
virtualColumn.getOutputName(), virtualColumnName,
lowestDiscernibleValue, lowestDiscernibleValue,
highestTrackableValue, highestTrackableValue,
numberOfSignificantValueDigits, numberOfSignificantValueDigits,
@@ -234,7 +228,6 @@ public class HdrHistogramQuantileSqlAggregator implements SqlAggregator {
} }
return Aggregation.create( return Aggregation.create(
virtualColumns,
ImmutableList.of(aggregatorFactory), ImmutableList.of(aggregatorFactory),
new HdrHistogramToQuantilePostAggregator(name, histogramName, probability) new HdrHistogramToQuantilePostAggregator(name, histogramName, probability)
); );

View File

@@ -62,50 +62,30 @@ public class HdrHistogramQuantilesOperatorConversion implements SqlOperatorConve
{ {
final List<RexNode> operands = ((RexCall) rexNode).getOperands(); final List<RexNode> operands = ((RexCall) rexNode).getOperands();
final float[] args = new float[operands.size() - 1]; final float[] args = new float[operands.size() - 1];
PostAggregator postAgg = null;
int operandCounter = 0; // 新版本直接就从第一个参数取
for (RexNode operand : operands) { final PostAggregator inputSketchPostAgg = OperatorConversions.toPostAggregator(
final PostAggregator convertedPostAgg = OperatorConversions.toPostAggregator(
plannerContext, plannerContext,
rowSignature, rowSignature,
operand, operands.get(0),
postAggregatorVisitor postAggregatorVisitor,
true
); );
if (convertedPostAgg == null) {
if (operandCounter > 0) { if (inputSketchPostAgg == null) {
try {
if (!operand.isA(SqlKind.LITERAL)) {
return null; return null;
} }
float arg = ((Number) RexLiteral.value(operand)).floatValue();
args[operandCounter - 1] = arg;
}
catch (ClassCastException cce) {
return null;
}
} else {
return null;
}
} else {
if (operandCounter == 0) {
postAgg = convertedPostAgg;
} else {
if (!operand.isA(SqlKind.LITERAL)) {
return null;
}
}
}
operandCounter++;
}
if (postAgg == null) { // 直接解析
return null; for (int i = 1; i < operands.size(); i++) {
RexNode operand = operands.get(i);
float arg = ((Number) RexLiteral.value(operand)).floatValue();
args[i - 1] = arg;
} }
return new HdrHistogramToQuantilesPostAggregator( return new HdrHistogramToQuantilesPostAggregator(
postAggregatorVisitor.getOutputNamePrefix() + postAggregatorVisitor.getAndIncrementCounter(), postAggregatorVisitor.getOutputNamePrefix() + postAggregatorVisitor.getAndIncrementCounter(),
((FieldAccessPostAggregator)postAgg).getFieldName(), ((FieldAccessPostAggregator)inputSketchPostAgg).getFieldName(),
args args
); );
} }

View File

@@ -2,17 +2,13 @@ package org.apache.druid.query.aggregation.sketch.HdrHistogram;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import org.HdrHistogram.*; import org.HdrHistogram.*;
import org.apache.datasketches.theta.Sketches;
import org.apache.datasketches.theta.UpdateSketch;
import org.apache.druid.data.input.MapBasedRow; import org.apache.druid.data.input.MapBasedRow;
import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.TestLongColumnSelector; import org.apache.druid.query.aggregation.TestLongColumnSelector;
import org.apache.druid.query.aggregation.TestObjectColumnSelector; import org.apache.druid.query.aggregation.TestObjectColumnSelector;
import org.apache.druid.query.aggregation.datasketches.theta.SketchHolder;
import org.apache.druid.query.aggregation.datasketches.theta.SketchMergeAggregatorFactory;
import org.apache.druid.query.groupby.epinephelinae.GrouperTestUtil; import org.apache.druid.query.groupby.epinephelinae.GrouperTestUtil;
import org.apache.druid.query.groupby.epinephelinae.TestColumnSelectorFactory; import org.apache.druid.query.groupby.epinephelinae.GroupByTestColumnSelectorFactory;
import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnSelectorFactory;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@@ -230,7 +226,7 @@ public class HdrHistogramBufferAggregatorTest {
@Test @Test
public void testMergeAggregatorRelocate() { public void testMergeAggregatorRelocate() {
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory(); final GroupByTestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
HistogramSketch histogram = new HistogramSketch(3); HistogramSketch histogram = new HistogramSketch(3);
for (int i = 0; i < 100000; i++) { for (int i = 0; i < 100000; i++) {
histogram.recordValue(i); histogram.recordValue(i);
@@ -252,7 +248,7 @@ public class HdrHistogramBufferAggregatorTest {
@Test @Test
public void testAggregatorRelocate() { public void testAggregatorRelocate() {
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory(); final GroupByTestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
HistogramSketch histogram = new HistogramSketch(3); HistogramSketch histogram = new HistogramSketch(3);
for (int i = 0; i < 100000; i++) { for (int i = 0; i < 100000; i++) {
histogram.recordValue(i); histogram.recordValue(i);

View File

@@ -1,12 +1,15 @@
package org.apache.druid.query.aggregation.sketch.HdrHistogram.sql; package org.apache.druid.query.aggregation.sketch.HdrHistogram.sql;
import com.alibaba.fastjson2.JSON;
import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.Module;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.inject.Injector;
import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.schema.SchemaPlus;
import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRow;
import org.apache.druid.guice.DruidInjectorBuilder;
import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.Druids; import org.apache.druid.query.Druids;
@@ -27,66 +30,49 @@ import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.IndexBuilder; import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn; import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.QueryStackTests; import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.security.AuthTestUtils; import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthenticationResult; import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.sql.SqlLifecycle; import org.apache.druid.sql.calcite.BaseCalciteQueryTest;
import org.apache.druid.sql.SqlLifecycleFactory; import org.apache.druid.sql.calcite.QueryTestBuilder;
import org.apache.druid.sql.calcite.QueryTestRunner;
import org.apache.druid.sql.calcite.filtration.Filtration; import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.planner.DruidOperatorTable; import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
import org.apache.druid.sql.calcite.planner.PlannerConfig; import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.planner.PlannerFactory; import org.apache.druid.sql.calcite.planner.PlannerFactory;
import org.apache.druid.sql.calcite.util.CalciteTestBase; import org.apache.druid.sql.calcite.util.*;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.QueryLogHook;
import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec; import org.apache.druid.timeline.partition.LinearShardSpec;
import org.junit.*; import org.junit.*;
import org.junit.rules.TemporaryFolder; import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.*;
import java.util.List;
import java.util.Map;
public class HdrHistogramQuantileSqlAggregatorTest extends CalciteTestBase { public class HdrHistogramQuantileSqlAggregatorTest extends BaseCalciteQueryTest {
private static final String DATA_SOURCE = "foo"; @Override
public void gatherProperties(Properties properties)
private static QueryRunnerFactoryConglomerate conglomerate; {
private static Closer resourceCloser; super.gatherProperties(properties);
private static AuthenticationResult authenticationResult = CalciteTests.REGULAR_USER_AUTH_RESULT;
private static final Map<String, Object> QUERY_CONTEXT_DEFAULT = ImmutableMap.of(
PlannerContext.CTX_SQL_QUERY_ID, "dummy"
);
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule
public QueryLogHook queryLogHook = QueryLogHook.create();
private SpecificSegmentsQuerySegmentWalker walker;
private SqlLifecycleFactory sqlLifecycleFactory;
@BeforeClass
public static void setUpClass() {
resourceCloser = Closer.create();
conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(resourceCloser);
} }
@AfterClass @Override
public static void tearDownClass() throws IOException { public void configureGuice(DruidInjectorBuilder builder)
resourceCloser.close(); {
super.configureGuice(builder);
builder.addModule(new HdrHistogramModule());
} }
public static final List<InputRow> ROWS1 = ImmutableList.of( public static final List<InputRow> ROWS1 = ImmutableList.of(
CalciteTests.createRow( TestDataBuilder.createRow(
ImmutableMap.<String, Object>builder() ImmutableMap.<String, Object>builder()
.put("t", "2000-01-01") .put("t", "2000-01-01")
.put("m1", "1") .put("m1", "1")
@@ -96,7 +82,7 @@ public class HdrHistogramQuantileSqlAggregatorTest extends CalciteTestBase {
.put("dim3", ImmutableList.of("a", "b")) .put("dim3", ImmutableList.of("a", "b"))
.build() .build()
), ),
CalciteTests.createRow( TestDataBuilder.createRow(
ImmutableMap.<String, Object>builder() ImmutableMap.<String, Object>builder()
.put("t", "2000-01-02") .put("t", "2000-01-02")
.put("m1", "2.0") .put("m1", "2.0")
@@ -106,7 +92,7 @@ public class HdrHistogramQuantileSqlAggregatorTest extends CalciteTestBase {
.put("dim3", ImmutableList.of("b", "c")) .put("dim3", ImmutableList.of("b", "c"))
.build() .build()
), ),
CalciteTests.createRow( TestDataBuilder.createRow(
ImmutableMap.<String, Object>builder() ImmutableMap.<String, Object>builder()
.put("t", "2000-01-03") .put("t", "2000-01-03")
.put("m1", "3.0") .put("m1", "3.0")
@@ -116,7 +102,7 @@ public class HdrHistogramQuantileSqlAggregatorTest extends CalciteTestBase {
.put("dim3", ImmutableList.of("d")) .put("dim3", ImmutableList.of("d"))
.build() .build()
), ),
CalciteTests.createRow( TestDataBuilder.createRow(
ImmutableMap.<String, Object>builder() ImmutableMap.<String, Object>builder()
.put("t", "2001-01-01") .put("t", "2001-01-01")
.put("m1", "4.0") .put("m1", "4.0")
@@ -126,7 +112,7 @@ public class HdrHistogramQuantileSqlAggregatorTest extends CalciteTestBase {
.put("dim3", ImmutableList.of("")) .put("dim3", ImmutableList.of(""))
.build() .build()
), ),
CalciteTests.createRow( TestDataBuilder.createRow(
ImmutableMap.<String, Object>builder() ImmutableMap.<String, Object>builder()
.put("t", "2001-01-02") .put("t", "2001-01-02")
.put("m1", "5.0") .put("m1", "5.0")
@@ -136,7 +122,7 @@ public class HdrHistogramQuantileSqlAggregatorTest extends CalciteTestBase {
.put("dim3", ImmutableList.of()) .put("dim3", ImmutableList.of())
.build() .build()
), ),
CalciteTests.createRow( TestDataBuilder.createRow(
ImmutableMap.<String, Object>builder() ImmutableMap.<String, Object>builder()
.put("t", "2001-01-03") .put("t", "2001-01-03")
.put("m1", "6.0") .put("m1", "6.0")
@@ -146,15 +132,20 @@ public class HdrHistogramQuantileSqlAggregatorTest extends CalciteTestBase {
) )
); );
@Before @SuppressWarnings("resource")
public void setUp() throws Exception { @Override
public SpecificSegmentsQuerySegmentWalker createQuerySegmentWalker(
final QueryRunnerFactoryConglomerate conglomerate,
final JoinableFactoryWrapper joinableFactory,
final Injector injector
) throws IOException{
HdrHistogramModule.registerSerde(); HdrHistogramModule.registerSerde();
for (Module mod : new HdrHistogramModule().getJacksonModules()) { for (Module mod : new HdrHistogramModule().getJacksonModules()) {
CalciteTests.getJsonMapper().registerModule(mod); CalciteTests.getJsonMapper().registerModule(mod);
TestHelper.JSON_MAPPER.registerModule(mod); TestHelper.JSON_MAPPER.registerModule(mod);
} }
//final QueryableIndex index = TestHelper.getTestIndexIO().loadIndex(new File("D:/doc/datas/testIndex-6201298"));
final QueryableIndex index = IndexBuilder.create() /*final QueryableIndex index = IndexBuilder.create()
.tmpDir(temporaryFolder.newFolder()) .tmpDir(temporaryFolder.newFolder())
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
.schema( .schema(
@@ -176,59 +167,135 @@ public class HdrHistogramQuantileSqlAggregatorTest extends CalciteTestBase {
) )
//.rows(CalciteTests.ROWS1) //.rows(CalciteTests.ROWS1)
.rows(ROWS1) .rows(ROWS1)
.buildMMappedIndex(); .buildMMappedIndex();*/
walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add( String[] files = new String[]{
"D:\\doc\\datas\\statistics_rule_segments\\2023-10-16T00_00_00.000Z_2023-10-17T00_00_00.000Z\\2023-10-16T07_51_47.981Z\\0\\17a457e4-599d-49c2-86e7-6655851bb99a\\index",
"D:\\doc\\datas\\statistics_rule_segments\\2023-10-15T00_00_00.000Z_2023-10-16T00_00_00.000Z\\2023-10-15T00_00_04.240Z\\15\\9a766f6c-779d-4f9f-9ff5-6a12c19b8c6c\\index"
};
files = new String[]{
"D:/doc/datas/testIndex-6201298"
};
SpecificSegmentsQuerySegmentWalker walker = new SpecificSegmentsQuerySegmentWalker(conglomerate);
for (int i = 0; i < files.length; i++) {
QueryableIndex index = TestHelper.getTestIndexIO().loadIndex(new File(files[i]));
return walker.add(
DataSegment.builder() DataSegment.builder()
.dataSource(DATA_SOURCE) .dataSource(CalciteTests.DATASOURCE1)
.interval(index.getDataInterval()) .interval(index.getDataInterval())
.version("1") .version("1")
.shardSpec(new LinearShardSpec(0)) .shardSpec(new LinearShardSpec(i))
.size(0) .size(0)
.build(), .build(),
index index
); );
final PlannerConfig plannerConfig = new PlannerConfig();
final DruidOperatorTable operatorTable = new DruidOperatorTable(
ImmutableSet.of(
new HdrHistogramQuantileSqlAggregator(),
new HdrHistogramObjectSqlAggregator()
),
ImmutableSet.of(
new HdrHistogramQuantilesOperatorConversion(),
new HdrHistogramPercentilesOperatorConversion()
)
);
SchemaPlus rootSchema =
CalciteTests.createMockRootSchema(conglomerate, walker, plannerConfig, AuthTestUtils.TEST_AUTHORIZER_MAPPER);
sqlLifecycleFactory = CalciteTests.createSqlLifecycleFactory(
new PlannerFactory(
rootSchema,
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
operatorTable,
CalciteTests.createExprMacroTable(),
plannerConfig,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
CalciteTests.getJsonMapper(),
CalciteTests.DRUID_SCHEMA_NAME
)
);
} }
@After return walker;
public void tearDown() throws Exception { }
walker.close();
walker = null; @Test
public void testCount0() throws Exception {
String sql = "select count(1) cnt, APPROX_QUANTILE_HDR(hist_m1, 0.5, 1, 100, 2) from druid.foo where dim1 = 'aaa'";
QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
builder.run();
QueryTestRunner.QueryResults queryResults = builder.results();
List<Object[]> results = queryResults.results;
for (Object[] result : results) {
System.out.println(Arrays.toString(result));
}
}
@Test
public void testSqlQueryError() throws Exception {
String sql = "select min(__time) min_time,max(__time) max_time, HDR_HISTOGRAM(latency_ms_sketch) hdr from druid.foo";
QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
builder.run();
QueryTestRunner.QueryResults queryResults = builder.results();
List<Object[]> results = queryResults.results;
for (Object[] result : results) {
System.out.println(Arrays.toString(result));
}
} }
@Test @Test
public void testSqlQuery() throws Exception { public void testSqlQuery() throws Exception {
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); String[] columns = new String[]{"__time", "dim1", "dim2", "dim3", "cnt", "hist_m1", "m1"};
String sql = "select * from druid.foo"; String sql = "select " + String.join(",", columns) + " from druid.foo";
final List<Object[]> results = QueryTestBuilder builder = testBuilder().sql(sql);
sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); builder.run();
QueryTestRunner.QueryResults queryResults = builder.results();
List<Object[]> results = queryResults.results;
for (Object[] result : results) {
Map row = new LinkedHashMap();
for (int i = 0; i < result.length; i++) {
row.put(columns[i], result[i]);
}
System.out.println(JSON.toJSONString(row));
// System.out.println(Arrays.toString(result));
}
for (int i = 0; i < columns.length; i++) {
Object[] values = new Object[results.size()];
for (int j = 0; j < results.size(); j++) {
values[j] = results.get(j)[i];
}
System.out.println(columns[i] + ":" + Arrays.toString(values));
}
}
@Test
public void testSqlQuery3() throws Exception {
//cannotVectorize();
//String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)) from druid.foo where dim1 = ''";
String sql = "select HDR_HISTOGRAM(hist_m1) hdr from druid.foo ";
QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
builder.run();
QueryTestRunner.QueryResults queryResults = builder.results();
List<Object[]> results = queryResults.results;
for (Object[] result : results) {
System.out.println(Arrays.toString(result));
}
}
@Test
public void testSqlQuery4() throws Exception {
//cannotVectorize();
//String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)) from druid.foo where dim1 = ''";
String sql = "select APPROX_QUANTILE_HDR (hdr, 0.95) as p95th_tcp_latency_ms from (select HDR_HISTOGRAM(hist_m1) hdr from druid.foo) t ";
QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
builder.run();
QueryTestRunner.QueryResults queryResults = builder.results();
List<Object[]> results = queryResults.results;
for (Object[] result : results) {
System.out.println(Arrays.toString(result));
}
}
@Test
public void testSqlQuery5() throws Exception {
//cannotVectorize();
//String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)) from druid.foo where dim1 = ''";
String sql = "select dim1, APPROX_QUANTILE_HDR (hdr, 0.95) as p95th_tcp_latency_ms from (select dim1, HDR_HISTOGRAM(hist_m1) hdr from druid.foo group by dim1) t group by dim1";
QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
builder.run();
QueryTestRunner.QueryResults queryResults = builder.results();
List<Object[]> results = queryResults.results;
for (Object[] result : results) {
System.out.println(Arrays.toString(result));
}
}
@Test
public void testSqlQuery6() throws Exception {
//cannotVectorize();
//String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)) from druid.foo where dim1 = ''";
//String sql = "select dim1, APPROX_QUANTILE_HDR (hdr, 0.95) as p95th_tcp_latency_ms from (select dim1, HDR_HISTOGRAM(hist_m1) hdr from druid.foo group by dim1 limit 10) t group by dim1";
String sql = "select dim1, HDR_GET_QUANTILES(HDR_HISTOGRAM(hdr), 0.95) as p95th_tcp_latency_ms from (select dim1, HDR_HISTOGRAM(hist_m1) hdr from druid.foo group by dim1 limit 10) t group by dim1";
QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
builder.run();
QueryTestRunner.QueryResults queryResults = builder.results();
List<Object[]> results = queryResults.results;
for (Object[] result : results) { for (Object[] result : results) {
System.out.println(Arrays.toString(result)); System.out.println(Arrays.toString(result));
} }
@@ -236,10 +303,11 @@ public class HdrHistogramQuantileSqlAggregatorTest extends CalciteTestBase {
@Test @Test
public void testGroup() throws Exception { public void testGroup() throws Exception {
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
String sql = "select cnt, APPROX_QUANTILE_HDR(hist_m1, 0.5, 1, 100, 2) from druid.foo group by cnt"; String sql = "select cnt, APPROX_QUANTILE_HDR(hist_m1, 0.5, 1, 100, 2) from druid.foo group by cnt";
final List<Object[]> results = QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); builder.run();
QueryTestRunner.QueryResults queryResults = builder.results();
List<Object[]> results = queryResults.results;
for (Object[] result : results) { for (Object[] result : results) {
System.out.println(Arrays.toString(result)); System.out.println(Arrays.toString(result));
} }
@@ -247,10 +315,35 @@ public class HdrHistogramQuantileSqlAggregatorTest extends CalciteTestBase {
@Test @Test
public void testGroup2() throws Exception { public void testGroup2() throws Exception {
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
String sql = "select HDR_HISTOGRAM(hist_m1) from druid.foo"; String sql = "select HDR_HISTOGRAM(hist_m1) from druid.foo";
final List<Object[]> results = QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); builder.run();
QueryTestRunner.QueryResults queryResults = builder.results();
List<Object[]> results = queryResults.results;
for (Object[] result : results) {
System.out.println(Arrays.toString(result));
}
}
@Test
public void testGroup3() throws Exception {
String sql = "select APPROX_QUANTILE_HDR(h, 0.5) from(select HDR_HISTOGRAM(hist_m1) h from druid.foo) t";
QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
builder.run();
QueryTestRunner.QueryResults queryResults = builder.results();
List<Object[]> results = queryResults.results;
for (Object[] result : results) {
System.out.println(Arrays.toString(result));
}
}
@Test
public void testGroup4() throws Exception {
String sql = "select hdr_get_quantiles(h, 0.1, 0.2, 0.3, 0.5, 0.9, 0.99, 1) from(select HDR_HISTOGRAM(hist_m1) h from druid.foo) t";
QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
builder.run();
QueryTestRunner.QueryResults queryResults = builder.results();
List<Object[]> results = queryResults.results;
for (Object[] result : results) { for (Object[] result : results) {
System.out.println(Arrays.toString(result)); System.out.println(Arrays.toString(result));
} }
@@ -258,10 +351,11 @@ public class HdrHistogramQuantileSqlAggregatorTest extends CalciteTestBase {
@Test @Test
public void testSqlQueryGeneHdr() throws Exception { public void testSqlQueryGeneHdr() throws Exception {
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
String sql = "select HDR_HISTOGRAM(hist_m1, 1, 100, 2), HDR_HISTOGRAM(cnt, 1, 100, 2) from druid.foo"; String sql = "select HDR_HISTOGRAM(hist_m1, 1, 100, 2), HDR_HISTOGRAM(cnt, 1, 100, 2) from druid.foo";
final List<Object[]> results = QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); builder.run();
QueryTestRunner.QueryResults queryResults = builder.results();
List<Object[]> results = queryResults.results;
for (Object[] result : results) { for (Object[] result : results) {
System.out.println(Arrays.toString(result)); System.out.println(Arrays.toString(result));
} }
@@ -269,11 +363,12 @@ public class HdrHistogramQuantileSqlAggregatorTest extends CalciteTestBase {
@Test @Test
public void testSqlQueryGeneHdr2() throws Exception { public void testSqlQueryGeneHdr2() throws Exception {
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
// HDR_HISTOGRAM(hist_m1, 1, 100, 2), // HDR_HISTOGRAM(hist_m1, 1, 100, 2),
String sql = "select HDR_GET_QUANTILES(HDR_HISTOGRAM(m1, 1, 100, 2), 0.1, 0.2, 0.3, 0.5, 0.9, 1) from druid.foo"; String sql = "select HDR_GET_QUANTILES(HDR_HISTOGRAM(m1, 1, 100, 2), 0.1, 0.2, 0.3, 0.5, 0.9, 1) from druid.foo";
final List<Object[]> results = QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); builder.run();
QueryTestRunner.QueryResults queryResults = builder.results();
List<Object[]> results = queryResults.results;
for (Object[] result : results) { for (Object[] result : results) {
System.out.println(Arrays.toString(result)); System.out.println(Arrays.toString(result));
} }
@@ -281,14 +376,15 @@ public class HdrHistogramQuantileSqlAggregatorTest extends CalciteTestBase {
@Test @Test
public void testSqlQueryGeneHdrArgs() throws Exception { public void testSqlQueryGeneHdrArgs() throws Exception {
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
String sql = "select HDR_GET_QUANTILEs(HDR_HISTOGRAM(m1), 0.1, 0.2, 0.3, 0.5, 0.9, 1), " String sql = "select HDR_GET_QUANTILEs(HDR_HISTOGRAM(m1), 0.1, 0.2, 0.3, 0.5, 0.9, 1), "
+ "HDR_GET_QUANTILEs(HDR_HISTOGRAM(m1, 2), 0.1, 0.2, 0.3, 0.5, 0.9, 1) ,\n" + "HDR_GET_QUANTILEs(HDR_HISTOGRAM(m1, 2), 0.1, 0.2, 0.3, 0.5, 0.9, 1) ,\n"
+ "HDR_GET_QUANTILEs(HDR_HISTOGRAM(m1, 1, 110, 2), 0.1, 0.2, 0.3, 0.5, 0.9, 1) ,\n" + "HDR_GET_QUANTILEs(HDR_HISTOGRAM(m1, 1, 110, 2), 0.1, 0.2, 0.3, 0.5, 0.9, 1) ,\n"
+ "HDR_GET_QUANTILEs(HDR_HISTOGRAM(m1, 1, 110, 2, false), 0.1, 0.2, 0.3, 0.5, 0.9, 1) \n" + "HDR_GET_QUANTILEs(HDR_HISTOGRAM(m1, 1, 110, 2, false), 0.1, 0.2, 0.3, 0.5, 0.9, 1) \n"
+ "from druid.foo"; + "from druid.foo";
final List<Object[]> results = QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); builder.run();
QueryTestRunner.QueryResults queryResults = builder.results();
List<Object[]> results = queryResults.results;
for (Object[] result : results) { for (Object[] result : results) {
System.out.println(Arrays.toString(result)); System.out.println(Arrays.toString(result));
} }
@@ -296,14 +392,15 @@ public class HdrHistogramQuantileSqlAggregatorTest extends CalciteTestBase {
@Test @Test
public void testSqlQueryGeneHdrArgs2() throws Exception { public void testSqlQueryGeneHdrArgs2() throws Exception {
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
String sql = "select APPROX_QUANTILE_HDR(m1, 0.1), " String sql = "select APPROX_QUANTILE_HDR(m1, 0.1), "
+ "APPROX_QUANTILE_HDR(m1, 0.1, 2) ,\n" + "APPROX_QUANTILE_HDR(m1, 0.1, 2) ,\n"
+ "APPROX_QUANTILE_HDR(m1, 0.1, 1, 110, 2) ,\n" + "APPROX_QUANTILE_HDR(m1, 0.1, 1, 110, 2) ,\n"
+ "APPROX_QUANTILE_HDR(m1, 0.1, 1, 110, 2, false)\n" + "APPROX_QUANTILE_HDR(m1, 0.1, 1, 110, 2, false)\n"
+ "from druid.foo"; + "from druid.foo";
final List<Object[]> results = QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); builder.run();
QueryTestRunner.QueryResults queryResults = builder.results();
List<Object[]> results = queryResults.results;
for (Object[] result : results) { for (Object[] result : results) {
System.out.println(Arrays.toString(result)); System.out.println(Arrays.toString(result));
} }
@@ -311,14 +408,15 @@ public class HdrHistogramQuantileSqlAggregatorTest extends CalciteTestBase {
@Test @Test
public void testSqlQueryGeneHdr3() throws Exception { public void testSqlQueryGeneHdr3() throws Exception {
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
// 函数不区分大小写 // 函数不区分大小写
// HDR_HISTOGRAM(hist_m1, 1, 100, 2), // HDR_HISTOGRAM(hist_m1, 1, 100, 2),
//String sql = "select HDR_GET_PERCENTILES(HDR_HISTOGRAM(m1, 1, 100, 2)) from druid.foo"; //String sql = "select HDR_GET_PERCENTILES(HDR_HISTOGRAM(m1, 1, 100, 2)) from druid.foo";
//String sql = "select hdr_get_percentiles(hdr_histogram(m1, 1, 100, 2)) from druid.foo"; //String sql = "select hdr_get_percentiles(hdr_histogram(m1, 1, 100, 2)) from druid.foo";
String sql = "select hdr_get_percentiles(hdr_histogram(hist_m1, 1, 100, 2)) from druid.foo"; String sql = "select hdr_get_percentiles(hdr_histogram(hist_m1, 1, 100, 2)) from druid.foo";
final List<Object[]> results = QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); builder.run();
QueryTestRunner.QueryResults queryResults = builder.results();
List<Object[]> results = queryResults.results;
for (Object[] result : results) { for (Object[] result : results) {
System.out.println(Arrays.toString(result)); System.out.println(Arrays.toString(result));
} }
@@ -326,7 +424,6 @@ public class HdrHistogramQuantileSqlAggregatorTest extends CalciteTestBase {
@Test @Test
public void testSqlQueryQuantiles() throws Exception { public void testSqlQueryQuantiles() throws Exception {
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
String sql = "SELECT\n" String sql = "SELECT\n"
+ "APPROX_QUANTILE_HDR(m1, 0.01, 1, 100, 2),\n" + "APPROX_QUANTILE_HDR(m1, 0.01, 1, 100, 2),\n"
+ "APPROX_QUANTILE_HDR(m1, 0.5, 1, 100, 2),\n" + "APPROX_QUANTILE_HDR(m1, 0.5, 1, 100, 2),\n"
@@ -338,9 +435,10 @@ public class HdrHistogramQuantileSqlAggregatorTest extends CalciteTestBase {
+ "APPROX_QUANTILE_HDR(m1, 0.999, 1, 100, 2) FILTER(WHERE dim1 = 'abc'),\n" + "APPROX_QUANTILE_HDR(m1, 0.999, 1, 100, 2) FILTER(WHERE dim1 = 'abc'),\n"
+ "APPROX_QUANTILE_HDR(cnt, 0.5, 1, 100, 2)\n" + "APPROX_QUANTILE_HDR(cnt, 0.5, 1, 100, 2)\n"
+ "FROM foo"; + "FROM foo";
final List<Object[]> results = QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); builder.run();
System.out.println(sql); QueryTestRunner.QueryResults queryResults = builder.results();
List<Object[]> results = queryResults.results;
for (Object[] result : results) { for (Object[] result : results) {
System.out.println(Arrays.toString(result)); System.out.println(Arrays.toString(result));
} }
@@ -348,7 +446,6 @@ public class HdrHistogramQuantileSqlAggregatorTest extends CalciteTestBase {
@Test @Test
public void testSqlQueryQuantilesOnComplexColumn() throws Exception { public void testSqlQueryQuantilesOnComplexColumn() throws Exception {
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
String sql = "SELECT\n" String sql = "SELECT\n"
+ "APPROX_QUANTILE_HDR(hist_m1, 0.01, 1, 100, 2),\n" + "APPROX_QUANTILE_HDR(hist_m1, 0.01, 1, 100, 2),\n"
+ "APPROX_QUANTILE_HDR(hist_m1, 0.5, 1, 100, 2),\n" + "APPROX_QUANTILE_HDR(hist_m1, 0.5, 1, 100, 2),\n"
@@ -358,9 +455,10 @@ public class HdrHistogramQuantileSqlAggregatorTest extends CalciteTestBase {
+ "APPROX_QUANTILE_HDR(hist_m1, 0.999, 1, 100, 2) FILTER(WHERE dim1 <> 'abc'),\n" + "APPROX_QUANTILE_HDR(hist_m1, 0.999, 1, 100, 2) FILTER(WHERE dim1 <> 'abc'),\n"
+ "APPROX_QUANTILE_HDR(hist_m1, 0.999, 1, 100, 2) FILTER(WHERE dim1 = 'abc')\n" + "APPROX_QUANTILE_HDR(hist_m1, 0.999, 1, 100, 2) FILTER(WHERE dim1 = 'abc')\n"
+ "FROM foo"; + "FROM foo";
final List<Object[]> results = QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); builder.run();
System.out.println(sql); QueryTestRunner.QueryResults queryResults = builder.results();
List<Object[]> results = queryResults.results;
for (Object[] result : results) { for (Object[] result : results) {
System.out.println(Arrays.toString(result)); System.out.println(Arrays.toString(result));
} }
@@ -373,7 +471,6 @@ public class HdrHistogramQuantileSqlAggregatorTest extends CalciteTestBase {
@Test @Test
public void testQuantileOnFloatAndLongs() throws Exception { public void testQuantileOnFloatAndLongs() throws Exception {
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
String sql = "SELECT\n" String sql = "SELECT\n"
+ "APPROX_QUANTILE_HDR(m1, 0.01, 1, 100, 2),\n" + "APPROX_QUANTILE_HDR(m1, 0.01, 1, 100, 2),\n"
+ "APPROX_QUANTILE_HDR(m1, 0.5, 1, 100, 2),\n" + "APPROX_QUANTILE_HDR(m1, 0.5, 1, 100, 2),\n"
@@ -385,16 +482,8 @@ public class HdrHistogramQuantileSqlAggregatorTest extends CalciteTestBase {
+ "APPROX_QUANTILE_HDR(m1, 0.999, 1, 100, 2) FILTER(WHERE dim1 = 'abc'),\n" + "APPROX_QUANTILE_HDR(m1, 0.999, 1, 100, 2) FILTER(WHERE dim1 = 'abc'),\n"
+ "APPROX_QUANTILE_HDR(cnt, 0.5, 1, 100, 2)\n" + "APPROX_QUANTILE_HDR(cnt, 0.5, 1, 100, 2)\n"
+ "FROM foo"; + "FROM foo";
final List<Object[]> results = QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); builder = builder.expectedQueries(Collections.singletonList(Druids.newTimeseriesQueryBuilder()
System.out.println(sql);
for (Object[] result : results) {
System.out.println(Arrays.toString(result));
}
// Verify query
Assert.assertEquals(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1) .dataSource(CalciteTests.DATASOURCE1)
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
.granularity(Granularities.ALL) .granularity(Granularities.ALL)
@@ -402,7 +491,7 @@ public class HdrHistogramQuantileSqlAggregatorTest extends CalciteTestBase {
new ExpressionVirtualColumn( new ExpressionVirtualColumn(
"v0", "v0",
"(\"m1\" * 2)", "(\"m1\" * 2)",
ValueType.LONG, ColumnType.LONG,
TestExprMacroTable.INSTANCE TestExprMacroTable.INSTANCE
) )
) )
@@ -430,15 +519,18 @@ public class HdrHistogramQuantileSqlAggregatorTest extends CalciteTestBase {
new HdrHistogramToQuantilePostAggregator("a7", "a5:agg", 0.999f), new HdrHistogramToQuantilePostAggregator("a7", "a5:agg", 0.999f),
new HdrHistogramToQuantilePostAggregator("a8", "a8:agg", 0.50f) new HdrHistogramToQuantilePostAggregator("a8", "a8:agg", 0.50f)
) )
.context(ImmutableMap.of("skipEmptyBuckets", true, PlannerContext.CTX_SQL_QUERY_ID, "dummy")) .context(QUERY_CONTEXT_DEFAULT)
.build(), .build()));
Iterables.getOnlyElement(queryLogHook.getRecordedQueries()) builder.run();
); QueryTestRunner.QueryResults queryResults = builder.results();
List<Object[]> results = queryResults.results;
for (Object[] result : results) {
System.out.println(Arrays.toString(result));
}
} }
@Test @Test
public void testQuantileOnComplexColumn() throws Exception{ public void testQuantileOnComplexColumn() throws Exception{
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
String sql = "SELECT\n" String sql = "SELECT\n"
+ "APPROX_QUANTILE_HDR(hist_m1, 0.01, 1, 100, 2),\n" + "APPROX_QUANTILE_HDR(hist_m1, 0.01, 1, 100, 2),\n"
+ "APPROX_QUANTILE_HDR(hist_m1, 0.5, 1, 100, 2),\n" + "APPROX_QUANTILE_HDR(hist_m1, 0.5, 1, 100, 2),\n"
@@ -448,16 +540,8 @@ public class HdrHistogramQuantileSqlAggregatorTest extends CalciteTestBase {
+ "APPROX_QUANTILE_HDR(hist_m1, 0.999, 1, 100, 2) FILTER(WHERE dim1 <> 'abc'),\n" + "APPROX_QUANTILE_HDR(hist_m1, 0.999, 1, 100, 2) FILTER(WHERE dim1 <> 'abc'),\n"
+ "APPROX_QUANTILE_HDR(hist_m1, 0.999, 1, 100, 2) FILTER(WHERE dim1 = 'abc')\n" + "APPROX_QUANTILE_HDR(hist_m1, 0.999, 1, 100, 2) FILTER(WHERE dim1 = 'abc')\n"
+ "FROM foo"; + "FROM foo";
final List<Object[]> results = QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); builder = builder.expectedQueries(Collections.singletonList(Druids.newTimeseriesQueryBuilder()
System.out.println(sql);
for (Object[] result : results) {
System.out.println(Arrays.toString(result));
}
// Verify query
Assert.assertEquals(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1) .dataSource(CalciteTests.DATASOURCE1)
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
.granularity(Granularities.ALL) .granularity(Granularities.ALL)
@@ -481,10 +565,17 @@ public class HdrHistogramQuantileSqlAggregatorTest extends CalciteTestBase {
new HdrHistogramToQuantilePostAggregator("a5", "a5:agg", 0.999f), new HdrHistogramToQuantilePostAggregator("a5", "a5:agg", 0.999f),
new HdrHistogramToQuantilePostAggregator("a6", "a4:agg", 0.999f) new HdrHistogramToQuantilePostAggregator("a6", "a4:agg", 0.999f)
) )
.context(ImmutableMap.of("skipEmptyBuckets", true, PlannerContext.CTX_SQL_QUERY_ID, "dummy")) .context(QUERY_CONTEXT_DEFAULT)
.build(), .build()));
Iterables.getOnlyElement(queryLogHook.getRecordedQueries()) builder.run();
); QueryTestRunner.QueryResults queryResults = builder.results();
List<Object[]> results = queryResults.results;
for (Object[] result : results) {
System.out.println(Arrays.toString(result));
}
} }
private static PostAggregator makeFieldAccessPostAgg(String name) { private static PostAggregator makeFieldAccessPostAgg(String name) {

View File

@@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<groupId>org.apache.druid.extensions</groupId> <groupId>org.apache.druid.extensions</groupId>
<artifactId>druid-hlld_0.18.1</artifactId> <artifactId>druid-hlld_26.0.0</artifactId>
<name>druid-hlld</name> <name>druid-hlld</name>
<version>1.0-SNAPSHOT</version> <version>1.0-SNAPSHOT</version>
@@ -14,7 +14,7 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.target>1.8</maven.compiler.target>
<druid.version>0.18.1</druid.version> <druid.version>26.0.0</druid.version>
</properties> </properties>
<dependencies> <dependencies>
@@ -33,6 +33,14 @@
</dependency> </dependency>
<!-- Tests --> <!-- Tests -->
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<version>4.3</version>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>org.apache.druid</groupId> <groupId>org.apache.druid</groupId>
<artifactId>druid-processing</artifactId> <artifactId>druid-processing</artifactId>
@@ -42,9 +50,17 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.druid</groupId> <groupId>org.apache.druid</groupId>
<artifactId>druid-benchmarks</artifactId> <artifactId>druid-server</artifactId>
<version>${druid.version}</version> <version>${druid.version}</version>
<scope>test</scope> <scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-sql</artifactId>
<version>${druid.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>

View File

@@ -9,6 +9,7 @@ import org.apache.druid.query.aggregation.*;
import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.column.ColumnType;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.Collections; import java.util.Collections;
@@ -27,6 +28,7 @@ public class HllAggregatorFactory extends AggregatorFactory {
protected final String fieldName; protected final String fieldName;
protected final int precision; protected final int precision;
protected final boolean round; protected final boolean round;
protected final int updatableSerializationBytes;
public HllAggregatorFactory( public HllAggregatorFactory(
@JsonProperty("name") final String name, @JsonProperty("name") final String name,
@@ -44,6 +46,7 @@ public class HllAggregatorFactory extends AggregatorFactory {
this.fieldName = fieldName; this.fieldName = fieldName;
this.precision = precision == null ? DEFAULT_PRECISION : precision; this.precision = precision == null ? DEFAULT_PRECISION : precision;
this.round = round == null ? DEFAULT_ROUND : round; this.round = round == null ? DEFAULT_ROUND : round;
this.updatableSerializationBytes = getUpdatableSerializationBytes();
} }
@Override @Override
@@ -145,9 +148,9 @@ public class HllAggregatorFactory extends AggregatorFactory {
Math.max(precision, castedOther.precision), Math.max(precision, castedOther.precision),
round || castedOther.round round || castedOther.round
); );
} else {
throw new AggregatorFactoryNotMergeableException(this, other);
} }
throw new AggregatorFactoryNotMergeableException(this, other);
} }
@Override @Override
@@ -157,25 +160,42 @@ public class HllAggregatorFactory extends AggregatorFactory {
); );
} }
@Override
public AggregatorFactory withName(String newName) {
return new HllAggregatorFactory(newName, fieldName, precision, round);
}
@Override @Override
public Object deserialize(Object object) { public Object deserialize(Object object) {
if (object == null) {
return null;
}
return HllUtils.deserializeHll(object); return HllUtils.deserializeHll(object);
} }
@Override
public ColumnType getResultType() {
//return round ? ColumnType.LONG : ColumnType.DOUBLE;
return getIntermediateType();
}
@Nullable @Nullable
@Override @Override
public Object finalizeComputation(@Nullable Object object) { public Object finalizeComputation(@Nullable Object object) {
if (object == null) { if (object == null) {
return null; return null;
} }
final Hll hll = (Hll) object;
return object;
/*final Hll hll = (Hll) object;
final double estimate = hll.size(); final double estimate = hll.size();
if (round) { if (round) {
return Math.round(estimate); return Math.round(estimate);
} else { } else {
return estimate; return estimate;
} }*/
} }
@Override @Override
@@ -199,9 +219,16 @@ public class HllAggregatorFactory extends AggregatorFactory {
return round; return round;
} }
/*
没这个方法了, 新版本需要实现getIntermediateType方法
@Override @Override
public String getTypeName() { public String getTypeName() {
return HllModule.HLLD_BUILD_TYPE_NAME; return HllModule.HLLD_BUILD_TYPE_NAME;
}*/
@Override
public ColumnType getIntermediateType() {
return HllModule.BUILD_TYPE;
} }
@Override @Override
@@ -211,6 +238,10 @@ public class HllAggregatorFactory extends AggregatorFactory {
@Override @Override
public int getMaxIntermediateSize() { public int getMaxIntermediateSize() {
return updatableSerializationBytes == 0? getUpdatableSerializationBytes():updatableSerializationBytes;
}
protected int getUpdatableSerializationBytes(){
return Hll.getUpdatableSerializationBytes(precision); return Hll.getUpdatableSerializationBytes(precision);
} }

View File

@@ -4,10 +4,12 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.zdjz.galaxy.sketch.hlld.Hll; import com.zdjz.galaxy.sketch.hlld.Hll;
import com.zdjz.galaxy.sketch.hlld.HllUnion; import com.zdjz.galaxy.sketch.hlld.HllUnion;
import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.column.ColumnType;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@@ -21,9 +23,16 @@ public class HllMergeAggregatorFactory extends HllAggregatorFactory{
super(name, fieldName, precision, round); super(name, fieldName, precision, round);
} }
/*
没这个方法了, 新版本需要实现getIntermediateType方法
@Override @Override
public String getTypeName(){ public String getTypeName(){
return HllModule.HLLD_TYPE_NAME; return HllModule.HLLD_TYPE_NAME;
}*/
@Override
public ColumnType getIntermediateType() {
return HllModule.TYPE;
} }
@Override @Override
@@ -44,6 +53,11 @@ public class HllMergeAggregatorFactory extends HllAggregatorFactory{
); );
} }
@Override
public AggregatorFactory withName(String newName) {
return new HllMergeAggregatorFactory(newName, fieldName, precision, round);
}
@Override @Override
public byte[] getCacheKey() { public byte[] getCacheKey() {
return new CacheKeyBuilder(HllModule.CACHE_TYPE_ID_OFFSET).appendByte(HllModule.HLLD_MERGE_CACHE_TYPE_ID) return new CacheKeyBuilder(HllModule.CACHE_TYPE_ID_OFFSET).appendByte(HllModule.HLLD_MERGE_CACHE_TYPE_ID)
@@ -53,7 +67,7 @@ public class HllMergeAggregatorFactory extends HllAggregatorFactory{
} }
@Override @Override
public int getMaxIntermediateSize() { protected int getUpdatableSerializationBytes() {
return HllUnion.getUpdatableSerializationBytes(precision); return HllUnion.getUpdatableSerializationBytes(precision);
} }
} }

View File

@@ -10,6 +10,7 @@ import org.apache.druid.initialization.DruidModule;
import org.apache.druid.query.aggregation.sketch.hlld.sql.HllApproxCountDistinctSqlAggregator; import org.apache.druid.query.aggregation.sketch.hlld.sql.HllApproxCountDistinctSqlAggregator;
import org.apache.druid.query.aggregation.sketch.hlld.sql.HllEstimateOperatorConversion; import org.apache.druid.query.aggregation.sketch.hlld.sql.HllEstimateOperatorConversion;
import org.apache.druid.query.aggregation.sketch.hlld.sql.HllObjectSqlAggregator; import org.apache.druid.query.aggregation.sketch.hlld.sql.HllObjectSqlAggregator;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.serde.ComplexMetrics; import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.sql.guice.SqlBindings; import org.apache.druid.sql.guice.SqlBindings;
@@ -24,6 +25,9 @@ public class HllModule implements DruidModule {
public static final String HLLD_TYPE_NAME = "HLLDSketch"; public static final String HLLD_TYPE_NAME = "HLLDSketch";
public static final String HLLD_BUILD_TYPE_NAME = "HLLDSketchBuild"; public static final String HLLD_BUILD_TYPE_NAME = "HLLDSketchBuild";
public static final ColumnType TYPE = ColumnType.ofComplex(HLLD_TYPE_NAME);
public static final ColumnType BUILD_TYPE = ColumnType.ofComplex(HLLD_BUILD_TYPE_NAME);
@Override @Override
public void configure(Binder binder) { public void configure(Binder binder) {

View File

@@ -7,6 +7,8 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.PostAggregator; import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator; import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.column.ColumnType;
import java.util.Comparator; import java.util.Comparator;
import java.util.Map; import java.util.Map;
@@ -29,6 +31,12 @@ public class HllToEstimatePostAggregator implements PostAggregator {
this.round = round; this.round = round;
} }
// 新版本需要实现的方法
@Override
public ColumnType getType(ColumnInspector signature) {
return round ? ColumnType.LONG : ColumnType.DOUBLE;
}
@Override @Override
@JsonProperty @JsonProperty
public String getName() { public String getName() {
@@ -58,6 +66,9 @@ public class HllToEstimatePostAggregator implements PostAggregator {
@Override @Override
public Object compute(final Map<String, Object> combinedAggregators) { public Object compute(final Map<String, Object> combinedAggregators) {
final Hll sketch = (Hll) field.compute(combinedAggregators); final Hll sketch = (Hll) field.compute(combinedAggregators);
if(sketch == null){
return round ? 0L: 0D;
}
return round ? Math.round(sketch.size()) : sketch.size(); return round ? Math.round(sketch.size()) : sketch.size();
} }

View File

@@ -5,36 +5,44 @@ import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.type.*; import org.apache.calcite.sql.type.*;
import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.apache.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator; import org.apache.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator;
import org.apache.druid.segment.VirtualColumn; import org.apache.druid.query.aggregation.sketch.hlld.HllAggregatorFactory;
import org.apache.druid.query.aggregation.sketch.hlld.HllToEstimatePostAggregator;
import org.apache.druid.sql.calcite.aggregation.Aggregation; import org.apache.druid.sql.calcite.aggregation.Aggregation;
import java.util.Collections; import java.util.Collections;
import java.util.List;
public class HllApproxCountDistinctSqlAggregator extends HllBaseSqlAggregator { public class HllApproxCountDistinctSqlAggregator extends HllBaseSqlAggregator {
private static final SqlAggFunction FUNCTION_INSTANCE = new CPCSketchApproxCountDistinctSqlAggFunction(); private static final SqlAggFunction FUNCTION_INSTANCE = new CPCSketchApproxCountDistinctSqlAggFunction();
private static final String NAME = "APPROX_COUNT_DISTINCT_HLLD"; private static final String NAME = "APPROX_COUNT_DISTINCT_HLLD";
public HllApproxCountDistinctSqlAggregator(){
super(true);
}
@Override @Override
public SqlAggFunction calciteFunction() { public SqlAggFunction calciteFunction() {
return FUNCTION_INSTANCE; return FUNCTION_INSTANCE;
} }
// 新版本参数少了virtualColumns
@Override @Override
protected Aggregation toAggregation( protected Aggregation toAggregation(
String name, String name,
boolean finalizeAggregations, boolean finalizeAggregations,
List<VirtualColumn> virtualColumns,
AggregatorFactory aggregatorFactory AggregatorFactory aggregatorFactory
) { ) {
return Aggregation.create( return Aggregation.create(
virtualColumns,
Collections.singletonList(aggregatorFactory), Collections.singletonList(aggregatorFactory),
//感觉是否是最外层的函数吧 //感觉是否是最外层的函数吧
finalizeAggregations ? new FinalizingFieldAccessPostAggregator( finalizeAggregations ? new HllToEstimatePostAggregator(
name, name,
new FieldAccessPostAggregator(
aggregatorFactory.getName(),
aggregatorFactory.getName() aggregatorFactory.getName()
),
((HllAggregatorFactory)aggregatorFactory).isRound()
) : null ) : null
); );
} }

View File

@@ -2,6 +2,7 @@ package org.apache.druid.query.aggregation.sketch.hlld.sql;
import org.apache.calcite.rel.core.AggregateCall; import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.Project; import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexNode;
@@ -14,6 +15,7 @@ import org.apache.druid.query.aggregation.sketch.hlld.HllMergeAggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec; import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.segment.VirtualColumn; import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.column.ValueType;
import org.apache.druid.sql.calcite.aggregation.Aggregation; import org.apache.druid.sql.calcite.aggregation.Aggregation;
@@ -29,6 +31,13 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
public abstract class HllBaseSqlAggregator implements SqlAggregator { public abstract class HllBaseSqlAggregator implements SqlAggregator {
private final boolean finalizeSketch;
protected HllBaseSqlAggregator(boolean finalizeSketch){
this.finalizeSketch = finalizeSketch;
}
@Nullable @Nullable
@Override @Override
public Aggregation toDruidAggregation( public Aggregation toDruidAggregation(
@@ -93,13 +102,14 @@ public abstract class HllBaseSqlAggregator implements SqlAggregator {
round = HllAggregatorFactory.DEFAULT_ROUND; round = HllAggregatorFactory.DEFAULT_ROUND;
} }
final List<VirtualColumn> virtualColumns = new ArrayList<>(); // 新版本删除了final List<VirtualColumn> virtualColumns = new ArrayList<>();
final AggregatorFactory aggregatorFactory; final AggregatorFactory aggregatorFactory;
final String aggregatorName = finalizeAggregations ? Calcites.makePrefixedName(name, "a") : name; //final String aggregatorName = finalizeAggregations ? Calcites.makePrefixedName(name, "a") : name;
final String aggregatorName = finalizeSketch ? Calcites.makePrefixedName(name, "a") : name;
// 输入是Cpc返回HllMergeAggregatorFactory // 输入是Hll返回HllSketchMergeAggregatorFactory
if (columnArg.isDirectColumnAccess() if (columnArg.isDirectColumnAccess()
&& rowSignature.getColumnType(columnArg.getDirectColumn()).orElse(null) == ValueType.COMPLEX) { && rowSignature.getColumnType(columnArg.getDirectColumn()).map(type -> type.is(ValueType.COMPLEX)).orElse(false)) {
// 这就是具体的聚合函数吧 // 这就是具体的聚合函数吧
aggregatorFactory = new HllMergeAggregatorFactory( aggregatorFactory = new HllMergeAggregatorFactory(
aggregatorName, aggregatorName,
@@ -109,10 +119,10 @@ public abstract class HllBaseSqlAggregator implements SqlAggregator {
); );
} else { } else {
// 输入是regular columnHllBuildAggregatorFactory // 输入是regular columnHllBuildAggregatorFactory
final SqlTypeName sqlTypeName = columnRexNode.getType().getSqlTypeName(); final RelDataType dataType = columnRexNode.getType();
final ValueType inputType = Calcites.getValueTypeForSqlTypeName(sqlTypeName); final ColumnType inputType = Calcites.getColumnTypeForRelDataType(dataType);
if (inputType == null) { if (inputType == null) {
throw new ISE("Cannot translate sqlTypeName[%s] to Druid type for field[%s]", sqlTypeName, aggregatorName); throw new ISE("Cannot translate sqlTypeName[%s] to Druid type for field[%s]", dataType.getSqlTypeName(), aggregatorName);
} }
final DimensionSpec dimensionSpec; final DimensionSpec dimensionSpec;
@@ -120,15 +130,22 @@ public abstract class HllBaseSqlAggregator implements SqlAggregator {
if (columnArg.isDirectColumnAccess()) { if (columnArg.isDirectColumnAccess()) {
dimensionSpec = columnArg.getSimpleExtraction().toDimensionSpec(null, inputType); dimensionSpec = columnArg.getSimpleExtraction().toDimensionSpec(null, inputType);
} else { } else {
VirtualColumn virtualColumn = virtualColumnRegistry.getOrCreateVirtualColumnForExpression( String virtualColumnName = virtualColumnRegistry.getOrCreateVirtualColumnForExpression(
plannerContext,
columnArg, columnArg,
sqlTypeName dataType
); );
dimensionSpec = new DefaultDimensionSpec(virtualColumn.getOutputName(), null, inputType); dimensionSpec = new DefaultDimensionSpec(virtualColumnName, null, inputType);
virtualColumns.add(virtualColumn);
} }
// 新版本的判断输入是Hll
if (inputType.is(ValueType.COMPLEX)) {
aggregatorFactory = new HllMergeAggregatorFactory(
aggregatorName,
dimensionSpec.getOutputName(),
precision,
round
);
} else {
aggregatorFactory = new HllAggregatorFactory( aggregatorFactory = new HllAggregatorFactory(
aggregatorName, aggregatorName,
dimensionSpec.getDimension(), dimensionSpec.getDimension(),
@@ -136,11 +153,11 @@ public abstract class HllBaseSqlAggregator implements SqlAggregator {
round round
); );
} }
}
return toAggregation( return toAggregation(
name, name,
finalizeAggregations, finalizeSketch,
virtualColumns,
aggregatorFactory aggregatorFactory
); );
} }
@@ -148,7 +165,6 @@ public abstract class HllBaseSqlAggregator implements SqlAggregator {
protected abstract Aggregation toAggregation( protected abstract Aggregation toAggregation(
String name, String name,
boolean finalizeAggregations, boolean finalizeAggregations,
List<VirtualColumn> virtualColumns,
AggregatorFactory aggregatorFactory AggregatorFactory aggregatorFactory
); );
} }

View File

@@ -13,16 +13,15 @@ import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.sketch.hlld.HllAggregatorFactory; import org.apache.druid.query.aggregation.sketch.hlld.HllAggregatorFactory;
import org.apache.druid.query.aggregation.sketch.hlld.HllToEstimatePostAggregator; import org.apache.druid.query.aggregation.sketch.hlld.HllToEstimatePostAggregator;
import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.expression.DirectOperatorConversion; import org.apache.druid.sql.calcite.expression.*;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.OperatorConversions;
import org.apache.druid.sql.calcite.expression.PostAggregatorVisitor;
import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.planner.PlannerContext;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.List; import java.util.List;
public class HllEstimateOperatorConversion extends DirectOperatorConversion { // postAggregator, toDruidExpression返回null。相当于post udf和普通udf是不一样的。
// 新版本直接修改了父类
public class HllEstimateOperatorConversion implements SqlOperatorConversion {
private static final String FUNCTION_NAME = "HLLD_ESTIMATE"; private static final String FUNCTION_NAME = "HLLD_ESTIMATE";
private static final SqlFunction SQL_FUNCTION = OperatorConversions private static final SqlFunction SQL_FUNCTION = OperatorConversions
.operatorBuilder(StringUtils.toUpperCase(FUNCTION_NAME)) .operatorBuilder(StringUtils.toUpperCase(FUNCTION_NAME))
@@ -32,9 +31,7 @@ public class HllEstimateOperatorConversion extends DirectOperatorConversion {
.returnTypeInference(ReturnTypes.DOUBLE) .returnTypeInference(ReturnTypes.DOUBLE)
.build(); .build();
public HllEstimateOperatorConversion() { // 新版本少了构造函数
super(SQL_FUNCTION, FUNCTION_NAME);
}
@Override @Override
public SqlOperator calciteOperator() { public SqlOperator calciteOperator() {
@@ -63,7 +60,8 @@ public class HllEstimateOperatorConversion extends DirectOperatorConversion {
plannerContext, plannerContext,
rowSignature, rowSignature,
operands.get(0), operands.get(0),
postAggregatorVisitor postAggregatorVisitor,
true // 新版本多了个参数
); );
if (firstOperand == null) { if (firstOperand == null) {

View File

@@ -5,16 +5,18 @@ import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.type.*; import org.apache.calcite.sql.type.*;
import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.sql.calcite.aggregation.Aggregation; import org.apache.druid.sql.calcite.aggregation.Aggregation;
import java.util.Collections; import java.util.Collections;
import java.util.List;
public class HllObjectSqlAggregator extends HllBaseSqlAggregator { public class HllObjectSqlAggregator extends HllBaseSqlAggregator {
private static final SqlAggFunction FUNCTION_INSTANCE = new CpcSketchSqlAggFunction(); private static final SqlAggFunction FUNCTION_INSTANCE = new CpcSketchSqlAggFunction();
private static final String NAME = "HLLD"; private static final String NAME = "HLLD";
public HllObjectSqlAggregator(){
super(false);
}
@Override @Override
public SqlAggFunction calciteFunction() { public SqlAggFunction calciteFunction() {
return FUNCTION_INSTANCE; return FUNCTION_INSTANCE;
@@ -24,11 +26,9 @@ public class HllObjectSqlAggregator extends HllBaseSqlAggregator {
protected Aggregation toAggregation( protected Aggregation toAggregation(
String name, String name,
boolean finalizeAggregations, boolean finalizeAggregations,
List<VirtualColumn> virtualColumns,
AggregatorFactory aggregatorFactory AggregatorFactory aggregatorFactory
) { ) {
return Aggregation.create( return Aggregation.create(
virtualColumns,
Collections.singletonList(aggregatorFactory), Collections.singletonList(aggregatorFactory),
null null
); );

View File

@@ -1,83 +1,64 @@
package org.apache.druid.query.aggregation.sketch.hlld.sql; package org.apache.druid.query.aggregation.sketch.hlld.sql;
import com.alibaba.fastjson2.JSON;
import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.Module;
import com.google.common.collect.ImmutableMap; import com.google.inject.Injector;
import com.google.common.collect.ImmutableSet; import org.apache.druid.guice.DruidInjectorBuilder;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.sketch.hlld.HllAggregatorFactory;
import org.apache.druid.query.aggregation.sketch.hlld.HllModule; import org.apache.druid.query.aggregation.sketch.hlld.HllModule;
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.sql.calcite.BaseCalciteQueryTest;
import org.apache.druid.server.QueryStackTests; import org.apache.druid.sql.calcite.QueryTestBuilder;
import org.apache.druid.server.security.AuthTestUtils; import org.apache.druid.sql.calcite.QueryTestRunner;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.sql.SqlLifecycle;
import org.apache.druid.sql.SqlLifecycleFactory;
import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.planner.PlannerFactory;
import org.apache.druid.sql.calcite.util.CalciteTestBase;
import org.apache.druid.sql.calcite.util.CalciteTests; import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.QueryLogHook;
import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker; import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec; import org.apache.druid.timeline.partition.LinearShardSpec;
import org.junit.*; import org.junit.*;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.*;
import java.util.List;
import java.util.Map;
public class HllApproxCountDistinctSqlAggregatorTest extends CalciteTestBase { // 新版本父类直接变了,实现更简单了
private static final String DATA_SOURCE = "foo"; public class HllApproxCountDistinctSqlAggregatorTest extends BaseCalciteQueryTest {
private static final boolean ROUND = true; private static final boolean ROUND = true;
private static final Map<String, Object> QUERY_CONTEXT_DEFAULT = ImmutableMap.of(
PlannerContext.CTX_SQL_QUERY_ID, "dummy"
);
private static QueryRunnerFactoryConglomerate conglomerate;
private static Closer resourceCloser;
private static AuthenticationResult authenticationResult = CalciteTests.REGULAR_USER_AUTH_RESULT;
@Rule @Override
public TemporaryFolder temporaryFolder = new TemporaryFolder(); public void gatherProperties(Properties properties)
{
@Rule super.gatherProperties(properties);
public QueryLogHook queryLogHook = QueryLogHook.create(TestHelper.JSON_MAPPER);
private SpecificSegmentsQuerySegmentWalker walker;
private SqlLifecycleFactory sqlLifecycleFactory;
@BeforeClass
public static void setUpClass() {
resourceCloser = Closer.create();
conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(resourceCloser);
} }
@AfterClass @Override
public static void tearDownClass() throws IOException { public void configureGuice(DruidInjectorBuilder builder)
resourceCloser.close(); {
super.configureGuice(builder);
builder.addModule(new HllModule());
} }
@Before
public void setUp() throws Exception {
@SuppressWarnings("resource")
@Override
public SpecificSegmentsQuerySegmentWalker createQuerySegmentWalker(
final QueryRunnerFactoryConglomerate conglomerate,
final JoinableFactoryWrapper joinableFactory,
final Injector injector
) throws IOException
{
HllModule.registerSerde(); HllModule.registerSerde();
for (Module mod : new HllModule().getJacksonModules()) { for (Module mod : new HllModule().getJacksonModules()) {
CalciteTests.getJsonMapper().registerModule(mod); CalciteTests.getJsonMapper().registerModule(mod);
TestHelper.JSON_MAPPER.registerModule(mod); TestHelper.JSON_MAPPER.registerModule(mod);
} }
final QueryableIndex index = IndexBuilder.create() final QueryableIndex index = TestHelper.getTestIndexIO().loadIndex(new File("D:/doc/datas/testIndex-1369101812"));
//final QueryableIndex index = TestHelper.getTestIndexIO().loadIndex(new File("D:/doc/datas/9_index"));
/*final QueryableIndex index = IndexBuilder.create()
.tmpDir(temporaryFolder.newFolder()) .tmpDir(temporaryFolder.newFolder())
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
.schema( .schema(
@@ -95,12 +76,12 @@ public class HllApproxCountDistinctSqlAggregatorTest extends CalciteTestBase {
.withRollup(false) .withRollup(false)
.build() .build()
) )
.rows(CalciteTests.ROWS1) .rows(TestDataBuilder.ROWS1)
.buildMMappedIndex(); .buildMMappedIndex();*/
walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add( return new SpecificSegmentsQuerySegmentWalker(conglomerate).add(
DataSegment.builder() DataSegment.builder()
.dataSource(DATA_SOURCE) .dataSource(CalciteTests.DATASOURCE1)
.interval(index.getDataInterval()) .interval(index.getDataInterval())
.version("1") .version("1")
.shardSpec(new LinearShardSpec(0)) .shardSpec(new LinearShardSpec(0))
@@ -108,45 +89,80 @@ public class HllApproxCountDistinctSqlAggregatorTest extends CalciteTestBase {
.build(), .build(),
index index
); );
final PlannerConfig plannerConfig = new PlannerConfig();
final DruidOperatorTable operatorTable = new DruidOperatorTable(
ImmutableSet.of(
new HllApproxCountDistinctSqlAggregator(),
new HllObjectSqlAggregator()
),
ImmutableSet.of(
new HllEstimateOperatorConversion()
)
);
SchemaPlus rootSchema = CalciteTests.createMockRootSchema(conglomerate, walker, plannerConfig, AuthTestUtils.TEST_AUTHORIZER_MAPPER);
sqlLifecycleFactory = CalciteTests.createSqlLifecycleFactory(
new PlannerFactory(
rootSchema,
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
operatorTable,
CalciteTests.createExprMacroTable(),
plannerConfig,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
CalciteTests.getJsonMapper(),
CalciteTests.DRUID_SCHEMA_NAME
)
);
}
@After
public void tearDown() throws Exception {
walker.close();
walker = null;
} }
@Test @Test
public void testSqlQuery() throws Exception { public void testSqlQuery() throws Exception {
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); // Can't vectorize due to SUBSTRING expression.
String sql = "select * from druid.foo"; cannotVectorize();
final List<Object[]> results =
sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); String[] columns = new String[]{"__time", "dim1", "dim2", "dim3", "cnt", "hll_dim1", "m1"};
String sql = "select " + String.join(",", columns) + " from druid.foo";
QueryTestBuilder builder = testBuilder().sql(sql);
builder.run();
QueryTestRunner.QueryResults queryResults = builder.results();
List<Object[]> results = queryResults.results;
for (Object[] result : results) {
Map row = new LinkedHashMap();
for (int i = 0; i < result.length; i++) {
row.put(columns[i], result[i]);
}
System.out.println(JSON.toJSONString(row));
// System.out.println(Arrays.toString(result));
}
for (int i = 0; i < columns.length; i++) {
Object[] values = new Object[results.size()];
for (int j = 0; j < results.size(); j++) {
values[j] = results.get(j)[i];
}
System.out.println(columns[i] + ":" + Arrays.toString(values));
}
}
@Test
public void testSqlQuery11() throws Exception {
// Can't vectorize due to SUBSTRING expression.
//cannotVectorize();
String sql = "select HLLD(hll_dim1) hll_dim1 from (select hll_dim1 from druid.foo limit 5) t ";
//sql = "select HLLD(hll_dim1) hll_dim1 from druid.foo t ";
QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();;
builder.run();
QueryTestRunner.QueryResults queryResults = builder.results();
List<Object[]> results = queryResults.results;
for (Object[] result : results) {
System.out.println(Arrays.toString(result));
}
}
@Test
public void testSqlQuery12() throws Exception {
// Can't vectorize due to SUBSTRING expression.
cannotVectorize();
String sql = "select * from (select * from druid.foo limit 6) t where __time >= '1970-12-15 07:00:28' and __time < '2023-12-15 08:10:28' ";
QueryTestBuilder builder = testBuilder().sql(sql);
builder.run();
QueryTestRunner.QueryResults queryResults = builder.results();
List<Object[]> results = queryResults.results;
for (Object[] result : results) {
System.out.println(Arrays.toString(result));
}
}
@Test
public void testSqlQuery1() throws Exception {
// Can't vectorize due to SUBSTRING expression.
cannotVectorize();
String sql = "select dim1 from druid.foo";
QueryTestBuilder builder = testBuilder().sql(sql);
builder.run();
QueryTestRunner.QueryResults queryResults = builder.results();
List<Object[]> results = queryResults.results;
for (Object[] result : results) { for (Object[] result : results) {
System.out.println(Arrays.toString(result)); System.out.println(Arrays.toString(result));
} }
@@ -154,10 +170,100 @@ public class HllApproxCountDistinctSqlAggregatorTest extends CalciteTestBase {
@Test @Test
public void testSqlQuery2() throws Exception { public void testSqlQuery2() throws Exception {
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); //cannotVectorize();
String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)) from druid.foo where dim1 = ''"; //String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)) from druid.foo where dim1 = '1'";
final List<Object[]> results = // Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Aggregate expressions cannot be nested
sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); //String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)), APPROX_COUNT_DISTINCT_HLLD(HLLD(hll_dim1)), HLLD(hll_dim1) from druid.foo";
String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)), APPROX_COUNT_DISTINCT_HLLD(hll_dim1), HLLD(hll_dim1) from (select HLLD(hll_dim1) hll_dim1 from druid.foo) t";
QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
builder.run();
QueryTestRunner.QueryResults queryResults = builder.results();
List<Object[]> results = queryResults.results;
for (Object[] result : results) {
System.out.println(Arrays.toString(result));
}
}
@Test
public void testSqlQuery3() throws Exception {
//cannotVectorize();
//String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)) from druid.foo where dim1 = ''";
String sql = "select APPROX_COUNT_DISTINCT_HLLD(hll, 12) from (select HLLD(hll_dim1) hll from druid.foo where dim1 = '1') t ";
QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
builder.run();
QueryTestRunner.QueryResults queryResults = builder.results();
List<Object[]> results = queryResults.results;
for (Object[] result : results) {
System.out.println(Arrays.toString(result));
}
}
@Test
public void testSqlQuery4() throws Exception {
//cannotVectorize();
//String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)) from druid.foo where dim1 = ''";
String sql = "select APPROX_COUNT_DISTINCT_HLLD(hll, 12) from (select HLLD(hll_dim1) hll from druid.foo where dim1 = '1') t ";
QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
builder.run();
QueryTestRunner.QueryResults queryResults = builder.results();
List<Object[]> results = queryResults.results;
for (Object[] result : results) {
System.out.println(Arrays.toString(result));
}
}
@Test
public void testSqlQuery5() throws Exception {
//cannotVectorize();
//String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)) from druid.foo where dim1 = ''";
String sql = "select dim1,APPROX_COUNT_DISTINCT_HLLD(hll, 12) from (select dim1,HLLD(hll_dim1) hll from druid.foo where dim1 = '1' group by dim1) t group by dim1";
QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
builder.run();
QueryTestRunner.QueryResults queryResults = builder.results();
List<Object[]> results = queryResults.results;
for (Object[] result : results) {
System.out.println(Arrays.toString(result));
}
}
@Test
public void testSqlQuery6() throws Exception {
//cannotVectorize();
//String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)) from druid.foo where dim1 = ''";
String sql = "select dim1,APPROX_COUNT_DISTINCT_HLLD(hll, 12) from (select dim1,HLLD(dim1) hll from druid.foo where dim1 = '1' group by dim1 limit 10) t group by dim1";
//String sql = "select dim1,HLLD_ESTIMATE(HLLD(hll), false) from (select dim1,HLLD(dim1) hll from druid.foo where dim1 = '1' group by dim1 limit 10) t group by dim1";
QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
builder.run();
QueryTestRunner.QueryResults queryResults = builder.results();
List<Object[]> results = queryResults.results;
for (Object[] result : results) {
System.out.println(Arrays.toString(result));
}
}
@Test
public void testSqlQuery62() throws Exception {
//cannotVectorize();
//String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)) from druid.foo where dim1 = ''";
String sql = "select dim1,APPROX_COUNT_DISTINCT_HLLD(hll) from (select dim1,HLLD(dim1) hll from druid.foo where dim1 = '1' group by dim1 limit 10) t group by dim1";
QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
builder.run();
QueryTestRunner.QueryResults queryResults = builder.results();
List<Object[]> results = queryResults.results;
for (Object[] result : results) {
System.out.println(Arrays.toString(result));
}
}
@Test
public void testSqlQuery7() throws Exception {
//cannotVectorize();
//String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)) from druid.foo where dim1 = ''";
String sql = "select dim1,APPROX_COUNT_DISTINCT_HLLD(hll, 12) from (select dim1,HLLD(dim1) hll from druid.foo where dim1 = '1' group by dim1) t group by dim1 limit 10";
QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
builder.run();
QueryTestRunner.QueryResults queryResults = builder.results();
List<Object[]> results = queryResults.results;
for (Object[] result : results) { for (Object[] result : results) {
System.out.println(Arrays.toString(result)); System.out.println(Arrays.toString(result));
} }
@@ -165,26 +271,23 @@ public class HllApproxCountDistinctSqlAggregatorTest extends CalciteTestBase {
@Test @Test
public void testAgg() throws Exception { public void testAgg() throws Exception {
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
final String sql = "SELECT\n" final String sql = "SELECT\n"
+ " SUM(cnt),\n" + " SUM(cnt),\n"
+ " APPROX_COUNT_DISTINCT_HLLD(hll_dim1)\n" + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1)\n"
+ "FROM druid.foo"; + "FROM druid.foo";
final List<Object[]> results = QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); builder.run();
QueryTestRunner.QueryResults queryResults = builder.results();
List<Object[]> results = queryResults.results;
for (Object[] result : results) { for (Object[] result : results) {
System.out.println(Arrays.toString(result)); System.out.println(Arrays.toString(result));
} }
} }
@Test @Test
public void testDistinct() throws Exception { public void testDistinct() throws Exception {
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
final String sql = "SELECT\n" final String sql = "SELECT\n"
+ " SUM(cnt),\n" + " SUM(cnt),\n"
+ " APPROX_COUNT_DISTINCT_HLLD(dim2),\n" // uppercase + " APPROX_COUNT_DISTINCT_HLLD(dim2),\n" // uppercase
@@ -195,18 +298,17 @@ public class HllApproxCountDistinctSqlAggregatorTest extends CalciteTestBase {
+ " APPROX_COUNT_DISTINCT_HLLD(hll_dim1)\n" // on native HllSketch column + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1)\n" // on native HllSketch column
+ "FROM druid.foo"; + "FROM druid.foo";
final List<Object[]> results = QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); builder.run();
QueryTestRunner.QueryResults queryResults = builder.results();
List<Object[]> results = queryResults.results;
for (Object[] result : results) { for (Object[] result : results) {
System.out.println(Arrays.toString(result)); System.out.println(Arrays.toString(result));
} }
} }
@Test @Test
public void testDistinct2() throws Exception { public void testDistinct2() throws Exception {
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
final String sql = "SELECT\n" final String sql = "SELECT\n"
+ " SUM(cnt),\n" + " SUM(cnt),\n"
+ " APPROX_COUNT_DISTINCT_HLLD(dim2),\n" + " APPROX_COUNT_DISTINCT_HLLD(dim2),\n"
@@ -219,8 +321,26 @@ public class HllApproxCountDistinctSqlAggregatorTest extends CalciteTestBase {
+ " APPROX_COUNT_DISTINCT_HLLD(hll_dim1)\n" // on native HllSketch column + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1)\n" // on native HllSketch column
+ "FROM druid.foo"; + "FROM druid.foo";
final List<Object[]> results = QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); builder.run();
QueryTestRunner.QueryResults queryResults = builder.results();
List<Object[]> results = queryResults.results;
for (Object[] result : results) {
System.out.println(Arrays.toString(result));
}
}
@Test
public void testDistinctDebug2() throws Exception {
final String sql = "SELECT\n"
+ " dim1, dim2\n"
+ "FROM druid.foo";
QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
builder.run();
QueryTestRunner.QueryResults queryResults = builder.results();
List<Object[]> results = queryResults.results;
for (Object[] result : results) { for (Object[] result : results) {
System.out.println(Arrays.toString(result)); System.out.println(Arrays.toString(result));
} }
@@ -229,15 +349,15 @@ public class HllApproxCountDistinctSqlAggregatorTest extends CalciteTestBase {
@Test @Test
public void testDistinctDebug() throws Exception { public void testDistinctDebug() throws Exception {
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
final String sql = "SELECT\n" final String sql = "SELECT\n"
+ " SUM(cnt),\n" + " SUM(cnt),\n"
+ " APPROX_COUNT_DISTINCT_HLLD(dim2)\n" + " APPROX_COUNT_DISTINCT_HLLD(dim2)\n"
+ "FROM druid.foo"; + "FROM druid.foo";
final List<Object[]> results = QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); builder.run();
QueryTestRunner.QueryResults queryResults = builder.results();
List<Object[]> results = queryResults.results;
for (Object[] result : results) { for (Object[] result : results) {
System.out.println(Arrays.toString(result)); System.out.println(Arrays.toString(result));
} }
@@ -246,14 +366,14 @@ public class HllApproxCountDistinctSqlAggregatorTest extends CalciteTestBase {
@Test @Test
public void testDeser() throws Exception { public void testDeser() throws Exception {
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
final String sql = "SELECT\n" final String sql = "SELECT\n"
+ " APPROX_COUNT_DISTINCT_HLLD(hll_dim1) cnt\n" + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1) cnt\n"
+ "FROM druid.foo"; + "FROM druid.foo";
final List<Object[]> results = QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); builder.run();
QueryTestRunner.QueryResults queryResults = builder.results();
List<Object[]> results = queryResults.results;
for (Object[] result : results) { for (Object[] result : results) {
System.out.println(Arrays.toString(result)); System.out.println(Arrays.toString(result));
} }
@@ -263,30 +383,29 @@ public class HllApproxCountDistinctSqlAggregatorTest extends CalciteTestBase {
@Test @Test
public void testGroupBy() throws Exception { public void testGroupBy() throws Exception {
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
final String sql = "SELECT cnt,\n" final String sql = "SELECT cnt,\n"
+ " APPROX_COUNT_DISTINCT_HLLD(hll_dim1, 14) cnt2\n" + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1, 14) cnt2\n"
+ "FROM druid.foo group by cnt"; + "FROM druid.foo group by cnt";
final List<Object[]> results = QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); builder.run();
QueryTestRunner.QueryResults queryResults = builder.results();
List<Object[]> results = queryResults.results;
for (Object[] result : results) { for (Object[] result : results) {
System.out.println(Arrays.toString(result)); System.out.println(Arrays.toString(result));
} }
} }
@Test @Test
public void testGroupBy1() throws Exception { public void testGroupBy1() throws Exception {
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
final String sql = "SELECT __time,\n" final String sql = "SELECT __time,\n"
+ " APPROX_COUNT_DISTINCT_HLLD(hll_dim1, 14) cnt\n" + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1, 14) cnt\n"
+ "FROM druid.foo group by __time"; + "FROM druid.foo group by __time";
final List<Object[]> results = QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); builder.run();
QueryTestRunner.QueryResults queryResults = builder.results();
List<Object[]> results = queryResults.results;
for (Object[] result : results) { for (Object[] result : results) {
System.out.println(Arrays.toString(result)); System.out.println(Arrays.toString(result));
} }
@@ -295,14 +414,13 @@ public class HllApproxCountDistinctSqlAggregatorTest extends CalciteTestBase {
@Test @Test
public void testGroupBy2() throws Exception { public void testGroupBy2() throws Exception {
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
final String sql = "SELECT __time,\n" final String sql = "SELECT __time,\n"
+ " APPROX_COUNT_DISTINCT_HLLD(hll_dim1, 14) cnt\n" + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1, 14) cnt\n"
+ "FROM druid.foo group by __time order by cnt desc"; + "FROM druid.foo group by __time order by cnt desc";
QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
final List<Object[]> results = builder.run();
sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); QueryTestRunner.QueryResults queryResults = builder.results();
List<Object[]> results = queryResults.results;
for (Object[] result : results) { for (Object[] result : results) {
System.out.println(Arrays.toString(result)); System.out.println(Arrays.toString(result));
} }