druid hlld升级到26.0.0
This commit is contained in:
@@ -9,6 +9,7 @@ import org.apache.druid.query.aggregation.*;
|
||||
import org.apache.druid.query.cache.CacheKeyBuilder;
|
||||
import org.apache.druid.segment.ColumnSelectorFactory;
|
||||
import org.apache.druid.segment.ColumnValueSelector;
|
||||
import org.apache.druid.segment.column.ColumnType;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.Collections;
|
||||
@@ -145,9 +146,9 @@ public class HllAggregatorFactory extends AggregatorFactory {
|
||||
Math.max(precision, castedOther.precision),
|
||||
round || castedOther.round
|
||||
);
|
||||
} else {
|
||||
throw new AggregatorFactoryNotMergeableException(this, other);
|
||||
}
|
||||
|
||||
throw new AggregatorFactoryNotMergeableException(this, other);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -157,25 +158,38 @@ public class HllAggregatorFactory extends AggregatorFactory {
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AggregatorFactory withName(String newName) {
|
||||
return new HllAggregatorFactory(newName, fieldName, precision, round);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object deserialize(Object object) {
|
||||
return HllUtils.deserializeHll(object);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ColumnType getResultType() {
|
||||
return round ? ColumnType.LONG : ColumnType.DOUBLE;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public Object finalizeComputation(@Nullable Object object) {
|
||||
if (object == null) {
|
||||
return null;
|
||||
}
|
||||
final Hll hll = (Hll) object;
|
||||
|
||||
return object;
|
||||
|
||||
/*final Hll hll = (Hll) object;
|
||||
final double estimate = hll.size();
|
||||
|
||||
if (round) {
|
||||
return Math.round(estimate);
|
||||
} else {
|
||||
return estimate;
|
||||
}
|
||||
}*/
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -199,9 +213,16 @@ public class HllAggregatorFactory extends AggregatorFactory {
|
||||
return round;
|
||||
}
|
||||
|
||||
/*
|
||||
没这个方法了, 新版本需要实现getIntermediateType方法
|
||||
@Override
|
||||
public String getTypeName() {
|
||||
return HllModule.HLLD_BUILD_TYPE_NAME;
|
||||
}*/
|
||||
|
||||
@Override
|
||||
public ColumnType getIntermediateType() {
|
||||
return HllModule.BUILD_TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -4,10 +4,12 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.zdjz.galaxy.sketch.hlld.Hll;
|
||||
import com.zdjz.galaxy.sketch.hlld.HllUnion;
|
||||
import org.apache.druid.query.aggregation.Aggregator;
|
||||
import org.apache.druid.query.aggregation.AggregatorFactory;
|
||||
import org.apache.druid.query.aggregation.BufferAggregator;
|
||||
import org.apache.druid.query.cache.CacheKeyBuilder;
|
||||
import org.apache.druid.segment.ColumnSelectorFactory;
|
||||
import org.apache.druid.segment.ColumnValueSelector;
|
||||
import org.apache.druid.segment.column.ColumnType;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
@@ -21,9 +23,16 @@ public class HllMergeAggregatorFactory extends HllAggregatorFactory{
|
||||
super(name, fieldName, precision, round);
|
||||
}
|
||||
|
||||
/*
|
||||
没这个方法了, 新版本需要实现getIntermediateType方法
|
||||
@Override
|
||||
public String getTypeName(){
|
||||
return HllModule.HLLD_TYPE_NAME;
|
||||
}*/
|
||||
|
||||
@Override
|
||||
public ColumnType getIntermediateType() {
|
||||
return HllModule.TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -44,6 +53,11 @@ public class HllMergeAggregatorFactory extends HllAggregatorFactory{
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AggregatorFactory withName(String newName) {
|
||||
return new HllMergeAggregatorFactory(newName, fieldName, precision, round);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getCacheKey() {
|
||||
return new CacheKeyBuilder(HllModule.CACHE_TYPE_ID_OFFSET).appendByte(HllModule.HLLD_MERGE_CACHE_TYPE_ID)
|
||||
|
||||
@@ -10,6 +10,7 @@ import org.apache.druid.initialization.DruidModule;
|
||||
import org.apache.druid.query.aggregation.sketch.hlld.sql.HllApproxCountDistinctSqlAggregator;
|
||||
import org.apache.druid.query.aggregation.sketch.hlld.sql.HllEstimateOperatorConversion;
|
||||
import org.apache.druid.query.aggregation.sketch.hlld.sql.HllObjectSqlAggregator;
|
||||
import org.apache.druid.segment.column.ColumnType;
|
||||
import org.apache.druid.segment.serde.ComplexMetrics;
|
||||
import org.apache.druid.sql.guice.SqlBindings;
|
||||
|
||||
@@ -24,6 +25,9 @@ public class HllModule implements DruidModule {
|
||||
|
||||
public static final String HLLD_TYPE_NAME = "HLLDSketch";
|
||||
public static final String HLLD_BUILD_TYPE_NAME = "HLLDSketchBuild";
|
||||
public static final ColumnType TYPE = ColumnType.ofComplex(HLLD_TYPE_NAME);
|
||||
public static final ColumnType BUILD_TYPE = ColumnType.ofComplex(HLLD_BUILD_TYPE_NAME);
|
||||
|
||||
|
||||
@Override
|
||||
public void configure(Binder binder) {
|
||||
|
||||
@@ -7,6 +7,8 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
|
||||
import org.apache.druid.query.aggregation.PostAggregator;
|
||||
import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
|
||||
import org.apache.druid.query.cache.CacheKeyBuilder;
|
||||
import org.apache.druid.segment.ColumnInspector;
|
||||
import org.apache.druid.segment.column.ColumnType;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.Map;
|
||||
@@ -29,6 +31,12 @@ public class HllToEstimatePostAggregator implements PostAggregator {
|
||||
this.round = round;
|
||||
}
|
||||
|
||||
// 新版本需要实现的方法
|
||||
@Override
|
||||
public ColumnType getType(ColumnInspector signature) {
|
||||
return round ? ColumnType.LONG : ColumnType.DOUBLE;
|
||||
}
|
||||
|
||||
@Override
|
||||
@JsonProperty
|
||||
public String getName() {
|
||||
|
||||
@@ -5,36 +5,44 @@ import org.apache.calcite.sql.SqlFunctionCategory;
|
||||
import org.apache.calcite.sql.SqlKind;
|
||||
import org.apache.calcite.sql.type.*;
|
||||
import org.apache.druid.query.aggregation.AggregatorFactory;
|
||||
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
|
||||
import org.apache.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator;
|
||||
import org.apache.druid.segment.VirtualColumn;
|
||||
import org.apache.druid.query.aggregation.sketch.hlld.HllAggregatorFactory;
|
||||
import org.apache.druid.query.aggregation.sketch.hlld.HllToEstimatePostAggregator;
|
||||
import org.apache.druid.sql.calcite.aggregation.Aggregation;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
public class HllApproxCountDistinctSqlAggregator extends HllBaseSqlAggregator {
|
||||
private static final SqlAggFunction FUNCTION_INSTANCE = new CPCSketchApproxCountDistinctSqlAggFunction();
|
||||
private static final String NAME = "APPROX_COUNT_DISTINCT_HLLD";
|
||||
|
||||
public HllApproxCountDistinctSqlAggregator(){
|
||||
super(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SqlAggFunction calciteFunction() {
|
||||
return FUNCTION_INSTANCE;
|
||||
}
|
||||
|
||||
// 新版本参数少了virtualColumns
|
||||
@Override
|
||||
protected Aggregation toAggregation(
|
||||
String name,
|
||||
boolean finalizeAggregations,
|
||||
List<VirtualColumn> virtualColumns,
|
||||
AggregatorFactory aggregatorFactory
|
||||
) {
|
||||
return Aggregation.create(
|
||||
virtualColumns,
|
||||
Collections.singletonList(aggregatorFactory),
|
||||
//感觉是否是最外层的函数吧
|
||||
finalizeAggregations ? new FinalizingFieldAccessPostAggregator(
|
||||
finalizeAggregations ? new HllToEstimatePostAggregator(
|
||||
name,
|
||||
aggregatorFactory.getName()
|
||||
new FieldAccessPostAggregator(
|
||||
aggregatorFactory.getName(),
|
||||
aggregatorFactory.getName()
|
||||
),
|
||||
((HllAggregatorFactory)aggregatorFactory).isRound()
|
||||
) : null
|
||||
);
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package org.apache.druid.query.aggregation.sketch.hlld.sql;
|
||||
|
||||
import org.apache.calcite.rel.core.AggregateCall;
|
||||
import org.apache.calcite.rel.core.Project;
|
||||
import org.apache.calcite.rel.type.RelDataType;
|
||||
import org.apache.calcite.rex.RexBuilder;
|
||||
import org.apache.calcite.rex.RexLiteral;
|
||||
import org.apache.calcite.rex.RexNode;
|
||||
@@ -14,6 +15,7 @@ import org.apache.druid.query.aggregation.sketch.hlld.HllMergeAggregatorFactory;
|
||||
import org.apache.druid.query.dimension.DefaultDimensionSpec;
|
||||
import org.apache.druid.query.dimension.DimensionSpec;
|
||||
import org.apache.druid.segment.VirtualColumn;
|
||||
import org.apache.druid.segment.column.ColumnType;
|
||||
import org.apache.druid.segment.column.RowSignature;
|
||||
import org.apache.druid.segment.column.ValueType;
|
||||
import org.apache.druid.sql.calcite.aggregation.Aggregation;
|
||||
@@ -29,6 +31,13 @@ import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public abstract class HllBaseSqlAggregator implements SqlAggregator {
|
||||
|
||||
private final boolean finalizeSketch;
|
||||
|
||||
protected HllBaseSqlAggregator(boolean finalizeSketch){
|
||||
this.finalizeSketch = finalizeSketch;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public Aggregation toDruidAggregation(
|
||||
@@ -93,13 +102,14 @@ public abstract class HllBaseSqlAggregator implements SqlAggregator {
|
||||
round = HllAggregatorFactory.DEFAULT_ROUND;
|
||||
}
|
||||
|
||||
final List<VirtualColumn> virtualColumns = new ArrayList<>();
|
||||
// 新版本删除了final List<VirtualColumn> virtualColumns = new ArrayList<>();
|
||||
final AggregatorFactory aggregatorFactory;
|
||||
final String aggregatorName = finalizeAggregations ? Calcites.makePrefixedName(name, "a") : name;
|
||||
//final String aggregatorName = finalizeAggregations ? Calcites.makePrefixedName(name, "a") : name;
|
||||
final String aggregatorName = finalizeSketch ? Calcites.makePrefixedName(name, "a") : name;
|
||||
|
||||
// 输入是Cpc,返回HllMergeAggregatorFactory
|
||||
// 输入是Hll,返回HllSketchMergeAggregatorFactory
|
||||
if (columnArg.isDirectColumnAccess()
|
||||
&& rowSignature.getColumnType(columnArg.getDirectColumn()).orElse(null) == ValueType.COMPLEX) {
|
||||
&& rowSignature.getColumnType(columnArg.getDirectColumn()).map(type -> type.is(ValueType.COMPLEX)).orElse(false)) {
|
||||
// 这就是具体的聚合函数吧
|
||||
aggregatorFactory = new HllMergeAggregatorFactory(
|
||||
aggregatorName,
|
||||
@@ -109,10 +119,10 @@ public abstract class HllBaseSqlAggregator implements SqlAggregator {
|
||||
);
|
||||
} else {
|
||||
// 输入是regular column,HllBuildAggregatorFactory
|
||||
final SqlTypeName sqlTypeName = columnRexNode.getType().getSqlTypeName();
|
||||
final ValueType inputType = Calcites.getValueTypeForSqlTypeName(sqlTypeName);
|
||||
final RelDataType dataType = columnRexNode.getType();
|
||||
final ColumnType inputType = Calcites.getColumnTypeForRelDataType(dataType);
|
||||
if (inputType == null) {
|
||||
throw new ISE("Cannot translate sqlTypeName[%s] to Druid type for field[%s]", sqlTypeName, aggregatorName);
|
||||
throw new ISE("Cannot translate sqlTypeName[%s] to Druid type for field[%s]", dataType.getSqlTypeName(), aggregatorName);
|
||||
}
|
||||
|
||||
final DimensionSpec dimensionSpec;
|
||||
@@ -120,27 +130,34 @@ public abstract class HllBaseSqlAggregator implements SqlAggregator {
|
||||
if (columnArg.isDirectColumnAccess()) {
|
||||
dimensionSpec = columnArg.getSimpleExtraction().toDimensionSpec(null, inputType);
|
||||
} else {
|
||||
VirtualColumn virtualColumn = virtualColumnRegistry.getOrCreateVirtualColumnForExpression(
|
||||
plannerContext,
|
||||
String virtualColumnName = virtualColumnRegistry.getOrCreateVirtualColumnForExpression(
|
||||
columnArg,
|
||||
sqlTypeName
|
||||
dataType
|
||||
);
|
||||
dimensionSpec = new DefaultDimensionSpec(virtualColumn.getOutputName(), null, inputType);
|
||||
virtualColumns.add(virtualColumn);
|
||||
dimensionSpec = new DefaultDimensionSpec(virtualColumnName, null, inputType);
|
||||
}
|
||||
|
||||
aggregatorFactory = new HllAggregatorFactory(
|
||||
aggregatorName,
|
||||
dimensionSpec.getDimension(),
|
||||
precision,
|
||||
round
|
||||
);
|
||||
// 新版本的判断,输入是Hll
|
||||
if (inputType.is(ValueType.COMPLEX)) {
|
||||
aggregatorFactory = new HllMergeAggregatorFactory(
|
||||
aggregatorName,
|
||||
dimensionSpec.getOutputName(),
|
||||
precision,
|
||||
round
|
||||
);
|
||||
} else {
|
||||
aggregatorFactory = new HllAggregatorFactory(
|
||||
aggregatorName,
|
||||
dimensionSpec.getDimension(),
|
||||
precision,
|
||||
round
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
return toAggregation(
|
||||
name,
|
||||
finalizeAggregations,
|
||||
virtualColumns,
|
||||
finalizeSketch,
|
||||
aggregatorFactory
|
||||
);
|
||||
}
|
||||
@@ -148,7 +165,6 @@ public abstract class HllBaseSqlAggregator implements SqlAggregator {
|
||||
protected abstract Aggregation toAggregation(
|
||||
String name,
|
||||
boolean finalizeAggregations,
|
||||
List<VirtualColumn> virtualColumns,
|
||||
AggregatorFactory aggregatorFactory
|
||||
);
|
||||
}
|
||||
|
||||
@@ -13,16 +13,15 @@ import org.apache.druid.query.aggregation.PostAggregator;
|
||||
import org.apache.druid.query.aggregation.sketch.hlld.HllAggregatorFactory;
|
||||
import org.apache.druid.query.aggregation.sketch.hlld.HllToEstimatePostAggregator;
|
||||
import org.apache.druid.segment.column.RowSignature;
|
||||
import org.apache.druid.sql.calcite.expression.DirectOperatorConversion;
|
||||
import org.apache.druid.sql.calcite.expression.DruidExpression;
|
||||
import org.apache.druid.sql.calcite.expression.OperatorConversions;
|
||||
import org.apache.druid.sql.calcite.expression.PostAggregatorVisitor;
|
||||
import org.apache.druid.sql.calcite.expression.*;
|
||||
import org.apache.druid.sql.calcite.planner.PlannerContext;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.List;
|
||||
|
||||
public class HllEstimateOperatorConversion extends DirectOperatorConversion {
|
||||
// postAggregator, toDruidExpression返回null。相当于post udf和普通udf是不一样的。
|
||||
// 新版本直接修改了父类
|
||||
public class HllEstimateOperatorConversion implements SqlOperatorConversion {
|
||||
private static final String FUNCTION_NAME = "HLLD_ESTIMATE";
|
||||
private static final SqlFunction SQL_FUNCTION = OperatorConversions
|
||||
.operatorBuilder(StringUtils.toUpperCase(FUNCTION_NAME))
|
||||
@@ -32,9 +31,7 @@ public class HllEstimateOperatorConversion extends DirectOperatorConversion {
|
||||
.returnTypeInference(ReturnTypes.DOUBLE)
|
||||
.build();
|
||||
|
||||
public HllEstimateOperatorConversion() {
|
||||
super(SQL_FUNCTION, FUNCTION_NAME);
|
||||
}
|
||||
// 新版本少了构造函数
|
||||
|
||||
@Override
|
||||
public SqlOperator calciteOperator() {
|
||||
@@ -63,7 +60,8 @@ public class HllEstimateOperatorConversion extends DirectOperatorConversion {
|
||||
plannerContext,
|
||||
rowSignature,
|
||||
operands.get(0),
|
||||
postAggregatorVisitor
|
||||
postAggregatorVisitor,
|
||||
true // 新版本多了个参数
|
||||
);
|
||||
|
||||
if (firstOperand == null) {
|
||||
|
||||
@@ -5,16 +5,18 @@ import org.apache.calcite.sql.SqlFunctionCategory;
|
||||
import org.apache.calcite.sql.SqlKind;
|
||||
import org.apache.calcite.sql.type.*;
|
||||
import org.apache.druid.query.aggregation.AggregatorFactory;
|
||||
import org.apache.druid.segment.VirtualColumn;
|
||||
import org.apache.druid.sql.calcite.aggregation.Aggregation;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
public class HllObjectSqlAggregator extends HllBaseSqlAggregator {
|
||||
private static final SqlAggFunction FUNCTION_INSTANCE = new CpcSketchSqlAggFunction();
|
||||
private static final String NAME = "HLLD";
|
||||
|
||||
public HllObjectSqlAggregator(){
|
||||
super(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SqlAggFunction calciteFunction() {
|
||||
return FUNCTION_INSTANCE;
|
||||
@@ -24,11 +26,9 @@ public class HllObjectSqlAggregator extends HllBaseSqlAggregator {
|
||||
protected Aggregation toAggregation(
|
||||
String name,
|
||||
boolean finalizeAggregations,
|
||||
List<VirtualColumn> virtualColumns,
|
||||
AggregatorFactory aggregatorFactory
|
||||
) {
|
||||
return Aggregation.create(
|
||||
virtualColumns,
|
||||
Collections.singletonList(aggregatorFactory),
|
||||
null
|
||||
);
|
||||
|
||||
Reference in New Issue
Block a user