From 26bb13fd74de55d59df0a5d1563c4ed48c8458c6 Mon Sep 17 00:00:00 2001 From: lifengchao Date: Mon, 25 Sep 2023 10:42:16 +0800 Subject: [PATCH] =?UTF-8?q?druid=20hlld=E5=8D=87=E7=BA=A7=E5=88=B026.0.0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- druid-hlld/pom.xml | 22 +- .../sketch/hlld/HllAggregatorFactory.java | 29 +- .../hlld/HllMergeAggregatorFactory.java | 14 + .../aggregation/sketch/hlld/HllModule.java | 4 + .../hlld/HllToEstimatePostAggregator.java | 8 + .../HllApproxCountDistinctSqlAggregator.java | 20 +- .../sketch/hlld/sql/HllBaseSqlAggregator.java | 58 ++-- .../sql/HllEstimateOperatorConversion.java | 16 +- .../hlld/sql/HllObjectSqlAggregator.java | 8 +- ...lApproxCountDistinctSqlAggregatorTest.java | 304 ++++++++++-------- 10 files changed, 298 insertions(+), 185 deletions(-) diff --git a/druid-hlld/pom.xml b/druid-hlld/pom.xml index 87cb21a..2360f83 100644 --- a/druid-hlld/pom.xml +++ b/druid-hlld/pom.xml @@ -5,7 +5,7 @@ 4.0.0 org.apache.druid.extensions - druid-hlld_0.18.1 + druid-hlld_26.0.0 druid-hlld 1.0-SNAPSHOT @@ -14,7 +14,7 @@ UTF-8 1.8 1.8 - 0.18.1 + 26.0.0 @@ -33,6 +33,14 @@ + + + org.easymock + easymock + 4.3 + test + + org.apache.druid druid-processing @@ -42,9 +50,17 @@ org.apache.druid - druid-benchmarks + druid-server ${druid.version} test + test-jar + + + org.apache.druid + druid-sql + ${druid.version} + test-jar + test junit diff --git a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllAggregatorFactory.java b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllAggregatorFactory.java index ea1fad9..6b6dd08 100644 --- a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllAggregatorFactory.java +++ b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllAggregatorFactory.java @@ -9,6 +9,7 @@ import org.apache.druid.query.aggregation.*; import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.column.ColumnType; import javax.annotation.Nullable; import java.util.Collections; @@ -145,9 +146,9 @@ public class HllAggregatorFactory extends AggregatorFactory { Math.max(precision, castedOther.precision), round || castedOther.round ); - } else { - throw new AggregatorFactoryNotMergeableException(this, other); } + + throw new AggregatorFactoryNotMergeableException(this, other); } @Override @@ -157,25 +158,38 @@ public class HllAggregatorFactory extends AggregatorFactory { ); } + @Override + public AggregatorFactory withName(String newName) { + return new HllAggregatorFactory(newName, fieldName, precision, round); + } + @Override public Object deserialize(Object object) { return HllUtils.deserializeHll(object); } + @Override + public ColumnType getResultType() { + return round ? ColumnType.LONG : ColumnType.DOUBLE; + } + @Nullable @Override public Object finalizeComputation(@Nullable Object object) { if (object == null) { return null; } - final Hll hll = (Hll) object; + + return object; + + /*final Hll hll = (Hll) object; final double estimate = hll.size(); if (round) { return Math.round(estimate); } else { return estimate; - } + }*/ } @Override @@ -199,9 +213,16 @@ public class HllAggregatorFactory extends AggregatorFactory { return round; } + /* + 没这个方法了, 新版本需要实现getIntermediateType方法 @Override public String getTypeName() { return HllModule.HLLD_BUILD_TYPE_NAME; + }*/ + + @Override + public ColumnType getIntermediateType() { + return HllModule.BUILD_TYPE; } @Override diff --git a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllMergeAggregatorFactory.java b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllMergeAggregatorFactory.java index 6a80f0c..03f4846 100644 --- a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllMergeAggregatorFactory.java +++ b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllMergeAggregatorFactory.java @@ -4,10 +4,12 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.zdjz.galaxy.sketch.hlld.Hll; import com.zdjz.galaxy.sketch.hlld.HllUnion; import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.column.ColumnType; import javax.annotation.Nullable; @@ -21,9 +23,16 @@ public class HllMergeAggregatorFactory extends HllAggregatorFactory{ super(name, fieldName, precision, round); } + /* + 没这个方法了, 新版本需要实现getIntermediateType方法 @Override public String getTypeName(){ return HllModule.HLLD_TYPE_NAME; + }*/ + + @Override + public ColumnType getIntermediateType() { + return HllModule.TYPE; } @Override @@ -44,6 +53,11 @@ public class HllMergeAggregatorFactory extends HllAggregatorFactory{ ); } + @Override + public AggregatorFactory withName(String newName) { + return new HllMergeAggregatorFactory(newName, fieldName, precision, round); + } + @Override public byte[] getCacheKey() { return new CacheKeyBuilder(HllModule.CACHE_TYPE_ID_OFFSET).appendByte(HllModule.HLLD_MERGE_CACHE_TYPE_ID) diff --git a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllModule.java b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllModule.java index 49879c4..7982dcf 100644 --- a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllModule.java +++ b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllModule.java @@ -10,6 +10,7 @@ import org.apache.druid.initialization.DruidModule; import org.apache.druid.query.aggregation.sketch.hlld.sql.HllApproxCountDistinctSqlAggregator; import org.apache.druid.query.aggregation.sketch.hlld.sql.HllEstimateOperatorConversion; import org.apache.druid.query.aggregation.sketch.hlld.sql.HllObjectSqlAggregator; +import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.serde.ComplexMetrics; import org.apache.druid.sql.guice.SqlBindings; @@ -24,6 +25,9 @@ public class HllModule implements DruidModule { public static final String HLLD_TYPE_NAME = "HLLDSketch"; public static final String HLLD_BUILD_TYPE_NAME = "HLLDSketchBuild"; + public static final ColumnType TYPE = ColumnType.ofComplex(HLLD_TYPE_NAME); + public static final ColumnType BUILD_TYPE = ColumnType.ofComplex(HLLD_BUILD_TYPE_NAME); + @Override public void configure(Binder binder) { diff --git a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllToEstimatePostAggregator.java b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllToEstimatePostAggregator.java index 8f4a949..5a11005 100644 --- a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllToEstimatePostAggregator.java +++ b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllToEstimatePostAggregator.java @@ -7,6 +7,8 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.PostAggregator; import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator; import org.apache.druid.query.cache.CacheKeyBuilder; +import org.apache.druid.segment.ColumnInspector; +import org.apache.druid.segment.column.ColumnType; import java.util.Comparator; import java.util.Map; @@ -29,6 +31,12 @@ public class HllToEstimatePostAggregator implements PostAggregator { this.round = round; } + // 新版本需要实现的方法 + @Override + public ColumnType getType(ColumnInspector signature) { + return round ? ColumnType.LONG : ColumnType.DOUBLE; + } + @Override @JsonProperty public String getName() { diff --git a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllApproxCountDistinctSqlAggregator.java b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllApproxCountDistinctSqlAggregator.java index 4971063..c35b087 100644 --- a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllApproxCountDistinctSqlAggregator.java +++ b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllApproxCountDistinctSqlAggregator.java @@ -5,36 +5,44 @@ import org.apache.calcite.sql.SqlFunctionCategory; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.type.*; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; import org.apache.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator; -import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.query.aggregation.sketch.hlld.HllAggregatorFactory; +import org.apache.druid.query.aggregation.sketch.hlld.HllToEstimatePostAggregator; import org.apache.druid.sql.calcite.aggregation.Aggregation; import java.util.Collections; -import java.util.List; public class HllApproxCountDistinctSqlAggregator extends HllBaseSqlAggregator { private static final SqlAggFunction FUNCTION_INSTANCE = new CPCSketchApproxCountDistinctSqlAggFunction(); private static final String NAME = "APPROX_COUNT_DISTINCT_HLLD"; + public HllApproxCountDistinctSqlAggregator(){ + super(true); + } + @Override public SqlAggFunction calciteFunction() { return FUNCTION_INSTANCE; } + // 新版本参数少了virtualColumns @Override protected Aggregation toAggregation( String name, boolean finalizeAggregations, - List virtualColumns, AggregatorFactory aggregatorFactory ) { return Aggregation.create( - virtualColumns, Collections.singletonList(aggregatorFactory), //感觉是否是最外层的函数吧 - finalizeAggregations ? new FinalizingFieldAccessPostAggregator( + finalizeAggregations ? new HllToEstimatePostAggregator( name, - aggregatorFactory.getName() + new FieldAccessPostAggregator( + aggregatorFactory.getName(), + aggregatorFactory.getName() + ), + ((HllAggregatorFactory)aggregatorFactory).isRound() ) : null ); } diff --git a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllBaseSqlAggregator.java b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllBaseSqlAggregator.java index 4bdcf82..a065a4e 100644 --- a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllBaseSqlAggregator.java +++ b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllBaseSqlAggregator.java @@ -2,6 +2,7 @@ package org.apache.druid.query.aggregation.sketch.hlld.sql; import org.apache.calcite.rel.core.AggregateCall; import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; @@ -14,6 +15,7 @@ import org.apache.druid.query.aggregation.sketch.hlld.HllMergeAggregatorFactory; import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.dimension.DimensionSpec; import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.ValueType; import org.apache.druid.sql.calcite.aggregation.Aggregation; @@ -29,6 +31,13 @@ import java.util.ArrayList; import java.util.List; public abstract class HllBaseSqlAggregator implements SqlAggregator { + + private final boolean finalizeSketch; + + protected HllBaseSqlAggregator(boolean finalizeSketch){ + this.finalizeSketch = finalizeSketch; + } + @Nullable @Override public Aggregation toDruidAggregation( @@ -93,13 +102,14 @@ public abstract class HllBaseSqlAggregator implements SqlAggregator { round = HllAggregatorFactory.DEFAULT_ROUND; } - final List virtualColumns = new ArrayList<>(); + // 新版本删除了final List virtualColumns = new ArrayList<>(); final AggregatorFactory aggregatorFactory; - final String aggregatorName = finalizeAggregations ? Calcites.makePrefixedName(name, "a") : name; + //final String aggregatorName = finalizeAggregations ? Calcites.makePrefixedName(name, "a") : name; + final String aggregatorName = finalizeSketch ? Calcites.makePrefixedName(name, "a") : name; - // 输入是Cpc,返回HllMergeAggregatorFactory + // 输入是Hll,返回HllSketchMergeAggregatorFactory if (columnArg.isDirectColumnAccess() - && rowSignature.getColumnType(columnArg.getDirectColumn()).orElse(null) == ValueType.COMPLEX) { + && rowSignature.getColumnType(columnArg.getDirectColumn()).map(type -> type.is(ValueType.COMPLEX)).orElse(false)) { // 这就是具体的聚合函数吧 aggregatorFactory = new HllMergeAggregatorFactory( aggregatorName, @@ -109,10 +119,10 @@ public abstract class HllBaseSqlAggregator implements SqlAggregator { ); } else { // 输入是regular column,HllBuildAggregatorFactory - final SqlTypeName sqlTypeName = columnRexNode.getType().getSqlTypeName(); - final ValueType inputType = Calcites.getValueTypeForSqlTypeName(sqlTypeName); + final RelDataType dataType = columnRexNode.getType(); + final ColumnType inputType = Calcites.getColumnTypeForRelDataType(dataType); if (inputType == null) { - throw new ISE("Cannot translate sqlTypeName[%s] to Druid type for field[%s]", sqlTypeName, aggregatorName); + throw new ISE("Cannot translate sqlTypeName[%s] to Druid type for field[%s]", dataType.getSqlTypeName(), aggregatorName); } final DimensionSpec dimensionSpec; @@ -120,27 +130,34 @@ public abstract class HllBaseSqlAggregator implements SqlAggregator { if (columnArg.isDirectColumnAccess()) { dimensionSpec = columnArg.getSimpleExtraction().toDimensionSpec(null, inputType); } else { - VirtualColumn virtualColumn = virtualColumnRegistry.getOrCreateVirtualColumnForExpression( - plannerContext, + String virtualColumnName = virtualColumnRegistry.getOrCreateVirtualColumnForExpression( columnArg, - sqlTypeName + dataType ); - dimensionSpec = new DefaultDimensionSpec(virtualColumn.getOutputName(), null, inputType); - virtualColumns.add(virtualColumn); + dimensionSpec = new DefaultDimensionSpec(virtualColumnName, null, inputType); } - aggregatorFactory = new HllAggregatorFactory( - aggregatorName, - dimensionSpec.getDimension(), - precision, - round - ); + // 新版本的判断,输入是Hll + if (inputType.is(ValueType.COMPLEX)) { + aggregatorFactory = new HllMergeAggregatorFactory( + aggregatorName, + dimensionSpec.getOutputName(), + precision, + round + ); + } else { + aggregatorFactory = new HllAggregatorFactory( + aggregatorName, + dimensionSpec.getDimension(), + precision, + round + ); + } } return toAggregation( name, - finalizeAggregations, - virtualColumns, + finalizeSketch, aggregatorFactory ); } @@ -148,7 +165,6 @@ public abstract class HllBaseSqlAggregator implements SqlAggregator { protected abstract Aggregation toAggregation( String name, boolean finalizeAggregations, - List virtualColumns, AggregatorFactory aggregatorFactory ); } diff --git a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllEstimateOperatorConversion.java b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllEstimateOperatorConversion.java index 071d41b..41e38cb 100644 --- a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllEstimateOperatorConversion.java +++ b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllEstimateOperatorConversion.java @@ -13,16 +13,15 @@ import org.apache.druid.query.aggregation.PostAggregator; import org.apache.druid.query.aggregation.sketch.hlld.HllAggregatorFactory; import org.apache.druid.query.aggregation.sketch.hlld.HllToEstimatePostAggregator; import org.apache.druid.segment.column.RowSignature; -import org.apache.druid.sql.calcite.expression.DirectOperatorConversion; -import org.apache.druid.sql.calcite.expression.DruidExpression; -import org.apache.druid.sql.calcite.expression.OperatorConversions; -import org.apache.druid.sql.calcite.expression.PostAggregatorVisitor; +import org.apache.druid.sql.calcite.expression.*; import org.apache.druid.sql.calcite.planner.PlannerContext; import javax.annotation.Nullable; import java.util.List; -public class HllEstimateOperatorConversion extends DirectOperatorConversion { +// postAggregator, toDruidExpression返回null。相当于post udf和普通udf是不一样的。 +// 新版本直接修改了父类 +public class HllEstimateOperatorConversion implements SqlOperatorConversion { private static final String FUNCTION_NAME = "HLLD_ESTIMATE"; private static final SqlFunction SQL_FUNCTION = OperatorConversions .operatorBuilder(StringUtils.toUpperCase(FUNCTION_NAME)) @@ -32,9 +31,7 @@ public class HllEstimateOperatorConversion extends DirectOperatorConversion { .returnTypeInference(ReturnTypes.DOUBLE) .build(); - public HllEstimateOperatorConversion() { - super(SQL_FUNCTION, FUNCTION_NAME); - } + // 新版本少了构造函数 @Override public SqlOperator calciteOperator() { @@ -63,7 +60,8 @@ public class HllEstimateOperatorConversion extends DirectOperatorConversion { plannerContext, rowSignature, operands.get(0), - postAggregatorVisitor + postAggregatorVisitor, + true // 新版本多了个参数 ); if (firstOperand == null) { diff --git a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllObjectSqlAggregator.java b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllObjectSqlAggregator.java index 58bbd45..f0e7da6 100644 --- a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllObjectSqlAggregator.java +++ b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllObjectSqlAggregator.java @@ -5,16 +5,18 @@ import org.apache.calcite.sql.SqlFunctionCategory; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.type.*; import org.apache.druid.query.aggregation.AggregatorFactory; -import org.apache.druid.segment.VirtualColumn; import org.apache.druid.sql.calcite.aggregation.Aggregation; import java.util.Collections; -import java.util.List; public class HllObjectSqlAggregator extends HllBaseSqlAggregator { private static final SqlAggFunction FUNCTION_INSTANCE = new CpcSketchSqlAggFunction(); private static final String NAME = "HLLD"; + public HllObjectSqlAggregator(){ + super(false); + } + @Override public SqlAggFunction calciteFunction() { return FUNCTION_INSTANCE; @@ -24,11 +26,9 @@ public class HllObjectSqlAggregator extends HllBaseSqlAggregator { protected Aggregation toAggregation( String name, boolean finalizeAggregations, - List virtualColumns, AggregatorFactory aggregatorFactory ) { return Aggregation.create( - virtualColumns, Collections.singletonList(aggregatorFactory), null ); diff --git a/druid-hlld/src/test/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllApproxCountDistinctSqlAggregatorTest.java b/druid-hlld/src/test/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllApproxCountDistinctSqlAggregatorTest.java index 6a9f3a1..8bdc4eb 100644 --- a/druid-hlld/src/test/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllApproxCountDistinctSqlAggregatorTest.java +++ b/druid-hlld/src/test/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllApproxCountDistinctSqlAggregatorTest.java @@ -1,83 +1,64 @@ package org.apache.druid.query.aggregation.sketch.hlld.sql; +import com.alibaba.fastjson2.JSON; import com.fasterxml.jackson.databind.Module; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import org.apache.calcite.schema.SchemaPlus; -import org.apache.druid.java.util.common.io.Closer; +import com.google.inject.Injector; +import org.apache.druid.guice.DruidInjectorBuilder; import org.apache.druid.query.QueryRunnerFactoryConglomerate; -import org.apache.druid.query.aggregation.CountAggregatorFactory; -import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; -import org.apache.druid.query.aggregation.sketch.hlld.HllAggregatorFactory; import org.apache.druid.query.aggregation.sketch.hlld.HllModule; -import org.apache.druid.segment.IndexBuilder; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.TestHelper; -import org.apache.druid.segment.incremental.IncrementalIndexSchema; -import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; -import org.apache.druid.server.QueryStackTests; -import org.apache.druid.server.security.AuthTestUtils; -import org.apache.druid.server.security.AuthenticationResult; -import org.apache.druid.sql.SqlLifecycle; -import org.apache.druid.sql.SqlLifecycleFactory; -import org.apache.druid.sql.calcite.planner.DruidOperatorTable; -import org.apache.druid.sql.calcite.planner.PlannerConfig; -import org.apache.druid.sql.calcite.planner.PlannerContext; -import org.apache.druid.sql.calcite.planner.PlannerFactory; -import org.apache.druid.sql.calcite.util.CalciteTestBase; +import org.apache.druid.segment.join.JoinableFactoryWrapper; +import org.apache.druid.sql.calcite.BaseCalciteQueryTest; +import org.apache.druid.sql.calcite.QueryTestBuilder; +import org.apache.druid.sql.calcite.QueryTestRunner; import org.apache.druid.sql.calcite.util.CalciteTests; -import org.apache.druid.sql.calcite.util.QueryLogHook; import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.LinearShardSpec; import org.junit.*; -import org.junit.rules.TemporaryFolder; +import java.io.File; import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.Map; +import java.util.*; -public class HllApproxCountDistinctSqlAggregatorTest extends CalciteTestBase { - private static final String DATA_SOURCE = "foo"; +// 新版本父类直接变了,实现更简单了 +public class HllApproxCountDistinctSqlAggregatorTest extends BaseCalciteQueryTest { private static final boolean ROUND = true; - private static final Map QUERY_CONTEXT_DEFAULT = ImmutableMap.of( - PlannerContext.CTX_SQL_QUERY_ID, "dummy" - ); - private static QueryRunnerFactoryConglomerate conglomerate; - private static Closer resourceCloser; - private static AuthenticationResult authenticationResult = CalciteTests.REGULAR_USER_AUTH_RESULT; - @Rule - public TemporaryFolder temporaryFolder = new TemporaryFolder(); - - @Rule - public QueryLogHook queryLogHook = QueryLogHook.create(TestHelper.JSON_MAPPER); - - private SpecificSegmentsQuerySegmentWalker walker; - private SqlLifecycleFactory sqlLifecycleFactory; - - @BeforeClass - public static void setUpClass() { - resourceCloser = Closer.create(); - conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(resourceCloser); + @Override + public void gatherProperties(Properties properties) + { + super.gatherProperties(properties); } - @AfterClass - public static void tearDownClass() throws IOException { - resourceCloser.close(); + @Override + public void configureGuice(DruidInjectorBuilder builder) + { + super.configureGuice(builder); + builder.addModule(new HllModule()); } - @Before - public void setUp() throws Exception { + + + @SuppressWarnings("resource") + @Override + public SpecificSegmentsQuerySegmentWalker createQuerySegmentWalker( + final QueryRunnerFactoryConglomerate conglomerate, + final JoinableFactoryWrapper joinableFactory, + final Injector injector + ) throws IOException + { HllModule.registerSerde(); for (Module mod : new HllModule().getJacksonModules()) { CalciteTests.getJsonMapper().registerModule(mod); TestHelper.JSON_MAPPER.registerModule(mod); } - final QueryableIndex index = IndexBuilder.create() + final QueryableIndex index = TestHelper.getTestIndexIO().loadIndex(new File("D:/doc/datas/testIndex-1369101812")); + //final QueryableIndex index = TestHelper.getTestIndexIO().loadIndex(new File("D:/doc/datas/9_index")); + /*final QueryableIndex index = IndexBuilder.create() .tmpDir(temporaryFolder.newFolder()) .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) .schema( @@ -95,12 +76,12 @@ public class HllApproxCountDistinctSqlAggregatorTest extends CalciteTestBase { .withRollup(false) .build() ) - .rows(CalciteTests.ROWS1) - .buildMMappedIndex(); + .rows(TestDataBuilder.ROWS1) + .buildMMappedIndex();*/ - walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add( + return new SpecificSegmentsQuerySegmentWalker(conglomerate).add( DataSegment.builder() - .dataSource(DATA_SOURCE) + .dataSource(CalciteTests.DATASOURCE1) .interval(index.getDataInterval()) .version("1") .shardSpec(new LinearShardSpec(0)) @@ -108,45 +89,47 @@ public class HllApproxCountDistinctSqlAggregatorTest extends CalciteTestBase { .build(), index ); - - final PlannerConfig plannerConfig = new PlannerConfig(); - final DruidOperatorTable operatorTable = new DruidOperatorTable( - ImmutableSet.of( - new HllApproxCountDistinctSqlAggregator(), - new HllObjectSqlAggregator() - ), - ImmutableSet.of( - new HllEstimateOperatorConversion() - ) - ); - - SchemaPlus rootSchema = CalciteTests.createMockRootSchema(conglomerate, walker, plannerConfig, AuthTestUtils.TEST_AUTHORIZER_MAPPER); - sqlLifecycleFactory = CalciteTests.createSqlLifecycleFactory( - new PlannerFactory( - rootSchema, - CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), - operatorTable, - CalciteTests.createExprMacroTable(), - plannerConfig, - AuthTestUtils.TEST_AUTHORIZER_MAPPER, - CalciteTests.getJsonMapper(), - CalciteTests.DRUID_SCHEMA_NAME - ) - ); - } - - @After - public void tearDown() throws Exception { - walker.close(); - walker = null; } @Test public void testSqlQuery() throws Exception { - SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); - String sql = "select * from druid.foo"; - final List results = - sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); + // Can't vectorize due to SUBSTRING expression. + cannotVectorize(); + String[] columns = new String[]{"__time", "dim1", "dim2", "dim3", "cnt", "hll_dim1", "m1"}; + + String sql = "select " + String.join(",", columns) + " from druid.foo"; + QueryTestBuilder builder = testBuilder().sql(sql); + builder.run(); + QueryTestRunner.QueryResults queryResults = builder.results(); + List results = queryResults.results; + for (Object[] result : results) { + Map row = new LinkedHashMap(); + for (int i = 0; i < result.length; i++) { + row.put(columns[i], result[i]); + } + System.out.println(JSON.toJSONString(row)); + // System.out.println(Arrays.toString(result)); + } + + for (int i = 0; i < columns.length; i++) { + Object[] values = new Object[results.size()]; + for (int j = 0; j < results.size(); j++) { + values[j] = results.get(j)[i]; + } + System.out.println(columns[i] + ":" + Arrays.toString(values)); + } + } + + @Test + public void testSqlQuery1() throws Exception { + // Can't vectorize due to SUBSTRING expression. + cannotVectorize(); + + String sql = "select dim1 from druid.foo"; + QueryTestBuilder builder = testBuilder().sql(sql); + builder.run(); + QueryTestRunner.QueryResults queryResults = builder.results(); + List results = queryResults.results; for (Object[] result : results) { System.out.println(Arrays.toString(result)); } @@ -154,37 +137,67 @@ public class HllApproxCountDistinctSqlAggregatorTest extends CalciteTestBase { @Test public void testSqlQuery2() throws Exception { - SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); - String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)) from druid.foo where dim1 = ''"; - final List results = - sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); + //cannotVectorize(); + //String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)) from druid.foo where dim1 = '1'"; + // Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Aggregate expressions cannot be nested + //String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)), APPROX_COUNT_DISTINCT_HLLD(HLLD(hll_dim1)), HLLD(hll_dim1) from druid.foo"; + String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)), APPROX_COUNT_DISTINCT_HLLD(hll_dim1), HLLD(hll_dim1) from (select HLLD(hll_dim1) hll_dim1 from druid.foo) t"; + QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize(); + builder.run(); + QueryTestRunner.QueryResults queryResults = builder.results(); + List results = queryResults.results; for (Object[] result : results) { System.out.println(Arrays.toString(result)); } } @Test - public void testAgg() throws Exception { - SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); + public void testSqlQuery3() throws Exception { + //cannotVectorize(); + //String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)) from druid.foo where dim1 = ''"; + String sql = "select APPROX_COUNT_DISTINCT_HLLD(hll, 12) from (select HLLD(hll_dim1) hll from druid.foo where dim1 = '1') t "; + QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize(); + builder.run(); + QueryTestRunner.QueryResults queryResults = builder.results(); + List results = queryResults.results; + for (Object[] result : results) { + System.out.println(Arrays.toString(result)); + } + } + @Test + public void testSqlQuery4() throws Exception { + //cannotVectorize(); + //String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)) from druid.foo where dim1 = ''"; + String sql = "select APPROX_COUNT_DISTINCT_HLLD(hll, 12) from (select HLLD(hll_dim1) hll from druid.foo where dim1 = '1') t "; + QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize(); + builder.run(); + QueryTestRunner.QueryResults queryResults = builder.results(); + List results = queryResults.results; + for (Object[] result : results) { + System.out.println(Arrays.toString(result)); + } + } + + @Test + public void testAgg() throws Exception { final String sql = "SELECT\n" + " SUM(cnt),\n" + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1)\n" + "FROM druid.foo"; - final List results = - sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); - for (Object[] result : results) { - System.out.println(Arrays.toString(result)); - } + QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize(); + builder.run(); + QueryTestRunner.QueryResults queryResults = builder.results(); + List results = queryResults.results; + for (Object[] result : results) { + System.out.println(Arrays.toString(result)); + } } - - @Test + @Test public void testDistinct() throws Exception { - SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); - final String sql = "SELECT\n" + " SUM(cnt),\n" + " APPROX_COUNT_DISTINCT_HLLD(dim2),\n" // uppercase @@ -195,18 +208,17 @@ public class HllApproxCountDistinctSqlAggregatorTest extends CalciteTestBase { + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1)\n" // on native HllSketch column + "FROM druid.foo"; - final List results = - sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); - for (Object[] result : results) { - System.out.println(Arrays.toString(result)); - } - + QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize(); + builder.run(); + QueryTestRunner.QueryResults queryResults = builder.results(); + List results = queryResults.results; + for (Object[] result : results) { + System.out.println(Arrays.toString(result)); + } } @Test public void testDistinct2() throws Exception { - SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); - final String sql = "SELECT\n" + " SUM(cnt),\n" + " APPROX_COUNT_DISTINCT_HLLD(dim2),\n" @@ -219,8 +231,26 @@ public class HllApproxCountDistinctSqlAggregatorTest extends CalciteTestBase { + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1)\n" // on native HllSketch column + "FROM druid.foo"; - final List results = - sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); + QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize(); + builder.run(); + QueryTestRunner.QueryResults queryResults = builder.results(); + List results = queryResults.results; + for (Object[] result : results) { + System.out.println(Arrays.toString(result)); + } + + } + + @Test + public void testDistinctDebug2() throws Exception { + final String sql = "SELECT\n" + + " dim1, dim2\n" + + "FROM druid.foo"; + + QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize(); + builder.run(); + QueryTestRunner.QueryResults queryResults = builder.results(); + List results = queryResults.results; for (Object[] result : results) { System.out.println(Arrays.toString(result)); } @@ -229,15 +259,15 @@ public class HllApproxCountDistinctSqlAggregatorTest extends CalciteTestBase { @Test public void testDistinctDebug() throws Exception { - SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); - final String sql = "SELECT\n" + " SUM(cnt),\n" + " APPROX_COUNT_DISTINCT_HLLD(dim2)\n" + "FROM druid.foo"; - final List results = - sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); + QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize(); + builder.run(); + QueryTestRunner.QueryResults queryResults = builder.results(); + List results = queryResults.results; for (Object[] result : results) { System.out.println(Arrays.toString(result)); } @@ -246,14 +276,14 @@ public class HllApproxCountDistinctSqlAggregatorTest extends CalciteTestBase { @Test public void testDeser() throws Exception { - SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); - final String sql = "SELECT\n" + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1) cnt\n" + "FROM druid.foo"; - final List results = - sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); + QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize(); + builder.run(); + QueryTestRunner.QueryResults queryResults = builder.results(); + List results = queryResults.results; for (Object[] result : results) { System.out.println(Arrays.toString(result)); } @@ -263,30 +293,29 @@ public class HllApproxCountDistinctSqlAggregatorTest extends CalciteTestBase { @Test public void testGroupBy() throws Exception { - SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); - final String sql = "SELECT cnt,\n" + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1, 14) cnt2\n" + "FROM druid.foo group by cnt"; - final List results = - sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); + QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize(); + builder.run(); + QueryTestRunner.QueryResults queryResults = builder.results(); + List results = queryResults.results; for (Object[] result : results) { System.out.println(Arrays.toString(result)); } - } @Test public void testGroupBy1() throws Exception { - SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); - final String sql = "SELECT __time,\n" + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1, 14) cnt\n" + "FROM druid.foo group by __time"; - final List results = - sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); + QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize(); + builder.run(); + QueryTestRunner.QueryResults queryResults = builder.results(); + List results = queryResults.results; for (Object[] result : results) { System.out.println(Arrays.toString(result)); } @@ -295,14 +324,13 @@ public class HllApproxCountDistinctSqlAggregatorTest extends CalciteTestBase { @Test public void testGroupBy2() throws Exception { - SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); - final String sql = "SELECT __time,\n" + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1, 14) cnt\n" + "FROM druid.foo group by __time order by cnt desc"; - - final List results = - sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); + QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize(); + builder.run(); + QueryTestRunner.QueryResults queryResults = builder.results(); + List results = queryResults.results; for (Object[] result : results) { System.out.println(Arrays.toString(result)); }