优化:getMaxIntermediateSize返回值初始化计算一次cache,getMaxIntermediateSize每行数据都会调用一次
This commit is contained in:
@@ -1,281 +1,287 @@
|
||||
package org.apache.druid.query.aggregation.sketch.hlld;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.zdjz.galaxy.sketch.hlld.Hll;
|
||||
import com.zdjz.galaxy.sketch.hlld.HllUnion;
|
||||
import org.apache.druid.java.util.common.IAE;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.query.aggregation.*;
|
||||
import org.apache.druid.query.cache.CacheKeyBuilder;
|
||||
import org.apache.druid.segment.ColumnSelectorFactory;
|
||||
import org.apache.druid.segment.ColumnValueSelector;
|
||||
import org.apache.druid.segment.column.ColumnType;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
public class HllAggregatorFactory extends AggregatorFactory {
|
||||
private static final Logger LOG = new Logger(HllAggregatorFactory.class);
|
||||
public static final boolean DEFAULT_ROUND = false;
|
||||
public static final int DEFAULT_PRECISION = 12;
|
||||
|
||||
static final Comparator<Hll> COMPARATOR = Comparator.nullsFirst(Comparator.comparingDouble(Hll::size));
|
||||
|
||||
protected final String name;
|
||||
protected final String fieldName;
|
||||
protected final int precision;
|
||||
protected final boolean round;
|
||||
|
||||
public HllAggregatorFactory(
|
||||
@JsonProperty("name") final String name,
|
||||
@JsonProperty("fieldName") final String fieldName,
|
||||
@JsonProperty("precision") @Nullable final Integer precision,
|
||||
@JsonProperty("round") @Nullable final Boolean round
|
||||
) {
|
||||
if (name == null) {
|
||||
throw new IAE("Must have a valid, non-null aggregator name");
|
||||
}
|
||||
if (fieldName == null) {
|
||||
throw new IAE("Parameter fieldName must be specified");
|
||||
}
|
||||
this.name = name;
|
||||
this.fieldName = fieldName;
|
||||
this.precision = precision == null ? DEFAULT_PRECISION : precision;
|
||||
this.round = round == null ? DEFAULT_ROUND : round;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Aggregator factorize(ColumnSelectorFactory columnSelectorFactory) {
|
||||
final ColumnValueSelector<Object> selector = columnSelectorFactory.makeColumnValueSelector(fieldName);
|
||||
return new HllAggregator(selector, precision);
|
||||
}
|
||||
|
||||
@Override
|
||||
public BufferAggregator factorizeBuffered(ColumnSelectorFactory columnSelectorFactory) {
|
||||
final ColumnValueSelector<Object> selector = columnSelectorFactory.makeColumnValueSelector(fieldName);
|
||||
return new HllBufferAggregator(
|
||||
selector,
|
||||
precision
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Comparator getComparator() {
|
||||
return COMPARATOR;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object combine(Object lhs, Object rhs) {
|
||||
if(lhs == null){
|
||||
return rhs;
|
||||
}else if(rhs == null){
|
||||
return lhs;
|
||||
}else{
|
||||
final HllUnion union = new HllUnion(precision);
|
||||
union.update((Hll) lhs);
|
||||
union.update((Hll) rhs);
|
||||
Hll result = union.getResult();
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public AggregateCombiner makeAggregateCombiner() {
|
||||
return new ObjectAggregateCombiner<Hll>() {
|
||||
private HllUnion union = null;
|
||||
|
||||
@Override
|
||||
public void reset(ColumnValueSelector selector) {
|
||||
//LOG.error("HllAggregateCombiner reset:" + "-" + Thread.currentThread().getId() + "-" + this);
|
||||
//union.reset();
|
||||
union = null;
|
||||
fold(selector);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fold(ColumnValueSelector selector) {
|
||||
//LOG.error("HllAggregateCombiner fold:" + "-" + Thread.currentThread().getId() + "-" + this);
|
||||
final Hll hll = (Hll) selector.getObject();
|
||||
if(hll != null){
|
||||
if(union == null){
|
||||
union = new HllUnion(precision);
|
||||
}
|
||||
union.update(hll);
|
||||
}else{
|
||||
//LOG.error("HllAggregateCombiner fold_null:" + "-" + Thread.currentThread().getId() + "-" + this);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<Hll> classOfObject() {
|
||||
return Hll.class;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public Hll getObject() {
|
||||
//LOG.error("HllAggregateCombiner get:" + "-" + Thread.currentThread().getId() + "-" + this);
|
||||
if(union == null){
|
||||
return null;
|
||||
}else{
|
||||
Hll result = union.getResult();
|
||||
/*if(result.size() == 0){
|
||||
return null;
|
||||
}*/
|
||||
return result;
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public AggregatorFactory getCombiningFactory() {
|
||||
// 千万不能写错,好大一个坑
|
||||
return new HllMergeAggregatorFactory(name, name, precision, round);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException {
|
||||
if (other.getName().equals(this.getName()) && other instanceof HllAggregatorFactory) {
|
||||
HllAggregatorFactory castedOther = (HllAggregatorFactory) other;
|
||||
|
||||
return new HllMergeAggregatorFactory(name, name,
|
||||
Math.max(precision, castedOther.precision),
|
||||
round || castedOther.round
|
||||
);
|
||||
}
|
||||
|
||||
throw new AggregatorFactoryNotMergeableException(this, other);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<AggregatorFactory> getRequiredColumns() {
|
||||
return Collections.singletonList(
|
||||
new HllAggregatorFactory(fieldName, fieldName, precision, round)
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AggregatorFactory withName(String newName) {
|
||||
return new HllAggregatorFactory(newName, fieldName, precision, round);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object deserialize(Object object) {
|
||||
if (object == null) {
|
||||
return null;
|
||||
}
|
||||
return HllUtils.deserializeHll(object);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ColumnType getResultType() {
|
||||
//return round ? ColumnType.LONG : ColumnType.DOUBLE;
|
||||
return getIntermediateType();
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public Object finalizeComputation(@Nullable Object object) {
|
||||
if (object == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return object;
|
||||
|
||||
/*final Hll hll = (Hll) object;
|
||||
final double estimate = hll.size();
|
||||
|
||||
if (round) {
|
||||
return Math.round(estimate);
|
||||
} else {
|
||||
return estimate;
|
||||
}*/
|
||||
}
|
||||
|
||||
@Override
|
||||
@JsonProperty
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getFieldName() {
|
||||
return fieldName;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public int getPrecision() {
|
||||
return precision;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public boolean isRound() {
|
||||
return round;
|
||||
}
|
||||
|
||||
/*
|
||||
没这个方法了, 新版本需要实现getIntermediateType方法
|
||||
@Override
|
||||
public String getTypeName() {
|
||||
return HllModule.HLLD_BUILD_TYPE_NAME;
|
||||
}*/
|
||||
|
||||
@Override
|
||||
public ColumnType getIntermediateType() {
|
||||
return HllModule.BUILD_TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> requiredFields() {
|
||||
return Collections.singletonList(fieldName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMaxIntermediateSize() {
|
||||
return Hll.getUpdatableSerializationBytes(precision);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getCacheKey() {
|
||||
return new CacheKeyBuilder(HllModule.CACHE_TYPE_ID_OFFSET).appendByte(HllModule.HLLD_BUILD_CACHE_TYPE_ID)
|
||||
.appendString(name).appendString(fieldName)
|
||||
.appendInt(precision).appendBoolean(round)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(final Object o){
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || !getClass().equals(o.getClass())) {
|
||||
return false;
|
||||
}
|
||||
|
||||
HllAggregatorFactory that = (HllAggregatorFactory) o;
|
||||
return name.equals(that.name) && fieldName.equals(that.fieldName) &&
|
||||
precision == that.precision &&
|
||||
round == that.round
|
||||
;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode(){
|
||||
return Objects.hash(name, fieldName, precision, round);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return getClass().getSimpleName() + "{" +
|
||||
"name='" + name + '\'' +
|
||||
", fieldName='" + fieldName + '\'' +
|
||||
", precision=" + precision +
|
||||
", round=" + round +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
package org.apache.druid.query.aggregation.sketch.hlld;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.zdjz.galaxy.sketch.hlld.Hll;
|
||||
import com.zdjz.galaxy.sketch.hlld.HllUnion;
|
||||
import org.apache.druid.java.util.common.IAE;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.query.aggregation.*;
|
||||
import org.apache.druid.query.cache.CacheKeyBuilder;
|
||||
import org.apache.druid.segment.ColumnSelectorFactory;
|
||||
import org.apache.druid.segment.ColumnValueSelector;
|
||||
import org.apache.druid.segment.column.ColumnType;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
public class HllAggregatorFactory extends AggregatorFactory {
|
||||
private static final Logger LOG = new Logger(HllAggregatorFactory.class);
|
||||
public static final boolean DEFAULT_ROUND = false;
|
||||
public static final int DEFAULT_PRECISION = 12;
|
||||
|
||||
static final Comparator<Hll> COMPARATOR = Comparator.nullsFirst(Comparator.comparingDouble(Hll::size));
|
||||
|
||||
protected final String name;
|
||||
protected final String fieldName;
|
||||
protected final int precision;
|
||||
protected final boolean round;
|
||||
protected final int updatableSerializationBytes;
|
||||
|
||||
public HllAggregatorFactory(
|
||||
@JsonProperty("name") final String name,
|
||||
@JsonProperty("fieldName") final String fieldName,
|
||||
@JsonProperty("precision") @Nullable final Integer precision,
|
||||
@JsonProperty("round") @Nullable final Boolean round
|
||||
) {
|
||||
if (name == null) {
|
||||
throw new IAE("Must have a valid, non-null aggregator name");
|
||||
}
|
||||
if (fieldName == null) {
|
||||
throw new IAE("Parameter fieldName must be specified");
|
||||
}
|
||||
this.name = name;
|
||||
this.fieldName = fieldName;
|
||||
this.precision = precision == null ? DEFAULT_PRECISION : precision;
|
||||
this.round = round == null ? DEFAULT_ROUND : round;
|
||||
this.updatableSerializationBytes = getUpdatableSerializationBytes();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Aggregator factorize(ColumnSelectorFactory columnSelectorFactory) {
|
||||
final ColumnValueSelector<Object> selector = columnSelectorFactory.makeColumnValueSelector(fieldName);
|
||||
return new HllAggregator(selector, precision);
|
||||
}
|
||||
|
||||
@Override
|
||||
public BufferAggregator factorizeBuffered(ColumnSelectorFactory columnSelectorFactory) {
|
||||
final ColumnValueSelector<Object> selector = columnSelectorFactory.makeColumnValueSelector(fieldName);
|
||||
return new HllBufferAggregator(
|
||||
selector,
|
||||
precision
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Comparator getComparator() {
|
||||
return COMPARATOR;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object combine(Object lhs, Object rhs) {
|
||||
if(lhs == null){
|
||||
return rhs;
|
||||
}else if(rhs == null){
|
||||
return lhs;
|
||||
}else{
|
||||
final HllUnion union = new HllUnion(precision);
|
||||
union.update((Hll) lhs);
|
||||
union.update((Hll) rhs);
|
||||
Hll result = union.getResult();
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public AggregateCombiner makeAggregateCombiner() {
|
||||
return new ObjectAggregateCombiner<Hll>() {
|
||||
private HllUnion union = null;
|
||||
|
||||
@Override
|
||||
public void reset(ColumnValueSelector selector) {
|
||||
//LOG.error("HllAggregateCombiner reset:" + "-" + Thread.currentThread().getId() + "-" + this);
|
||||
//union.reset();
|
||||
union = null;
|
||||
fold(selector);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fold(ColumnValueSelector selector) {
|
||||
//LOG.error("HllAggregateCombiner fold:" + "-" + Thread.currentThread().getId() + "-" + this);
|
||||
final Hll hll = (Hll) selector.getObject();
|
||||
if(hll != null){
|
||||
if(union == null){
|
||||
union = new HllUnion(precision);
|
||||
}
|
||||
union.update(hll);
|
||||
}else{
|
||||
//LOG.error("HllAggregateCombiner fold_null:" + "-" + Thread.currentThread().getId() + "-" + this);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<Hll> classOfObject() {
|
||||
return Hll.class;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public Hll getObject() {
|
||||
//LOG.error("HllAggregateCombiner get:" + "-" + Thread.currentThread().getId() + "-" + this);
|
||||
if(union == null){
|
||||
return null;
|
||||
}else{
|
||||
Hll result = union.getResult();
|
||||
/*if(result.size() == 0){
|
||||
return null;
|
||||
}*/
|
||||
return result;
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public AggregatorFactory getCombiningFactory() {
|
||||
// 千万不能写错,好大一个坑
|
||||
return new HllMergeAggregatorFactory(name, name, precision, round);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException {
|
||||
if (other.getName().equals(this.getName()) && other instanceof HllAggregatorFactory) {
|
||||
HllAggregatorFactory castedOther = (HllAggregatorFactory) other;
|
||||
|
||||
return new HllMergeAggregatorFactory(name, name,
|
||||
Math.max(precision, castedOther.precision),
|
||||
round || castedOther.round
|
||||
);
|
||||
}
|
||||
|
||||
throw new AggregatorFactoryNotMergeableException(this, other);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<AggregatorFactory> getRequiredColumns() {
|
||||
return Collections.singletonList(
|
||||
new HllAggregatorFactory(fieldName, fieldName, precision, round)
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AggregatorFactory withName(String newName) {
|
||||
return new HllAggregatorFactory(newName, fieldName, precision, round);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object deserialize(Object object) {
|
||||
if (object == null) {
|
||||
return null;
|
||||
}
|
||||
return HllUtils.deserializeHll(object);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ColumnType getResultType() {
|
||||
//return round ? ColumnType.LONG : ColumnType.DOUBLE;
|
||||
return getIntermediateType();
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public Object finalizeComputation(@Nullable Object object) {
|
||||
if (object == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return object;
|
||||
|
||||
/*final Hll hll = (Hll) object;
|
||||
final double estimate = hll.size();
|
||||
|
||||
if (round) {
|
||||
return Math.round(estimate);
|
||||
} else {
|
||||
return estimate;
|
||||
}*/
|
||||
}
|
||||
|
||||
@Override
|
||||
@JsonProperty
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getFieldName() {
|
||||
return fieldName;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public int getPrecision() {
|
||||
return precision;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public boolean isRound() {
|
||||
return round;
|
||||
}
|
||||
|
||||
/*
|
||||
没这个方法了, 新版本需要实现getIntermediateType方法
|
||||
@Override
|
||||
public String getTypeName() {
|
||||
return HllModule.HLLD_BUILD_TYPE_NAME;
|
||||
}*/
|
||||
|
||||
@Override
|
||||
public ColumnType getIntermediateType() {
|
||||
return HllModule.BUILD_TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> requiredFields() {
|
||||
return Collections.singletonList(fieldName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMaxIntermediateSize() {
|
||||
return updatableSerializationBytes == 0? getUpdatableSerializationBytes():updatableSerializationBytes;
|
||||
}
|
||||
|
||||
protected int getUpdatableSerializationBytes(){
|
||||
return Hll.getUpdatableSerializationBytes(precision);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getCacheKey() {
|
||||
return new CacheKeyBuilder(HllModule.CACHE_TYPE_ID_OFFSET).appendByte(HllModule.HLLD_BUILD_CACHE_TYPE_ID)
|
||||
.appendString(name).appendString(fieldName)
|
||||
.appendInt(precision).appendBoolean(round)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(final Object o){
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || !getClass().equals(o.getClass())) {
|
||||
return false;
|
||||
}
|
||||
|
||||
HllAggregatorFactory that = (HllAggregatorFactory) o;
|
||||
return name.equals(that.name) && fieldName.equals(that.fieldName) &&
|
||||
precision == that.precision &&
|
||||
round == that.round
|
||||
;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode(){
|
||||
return Objects.hash(name, fieldName, precision, round);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return getClass().getSimpleName() + "{" +
|
||||
"name='" + name + '\'' +
|
||||
", fieldName='" + fieldName + '\'' +
|
||||
", precision=" + precision +
|
||||
", round=" + round +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,73 +1,73 @@
|
||||
package org.apache.druid.query.aggregation.sketch.hlld;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.zdjz.galaxy.sketch.hlld.Hll;
|
||||
import com.zdjz.galaxy.sketch.hlld.HllUnion;
|
||||
import org.apache.druid.query.aggregation.Aggregator;
|
||||
import org.apache.druid.query.aggregation.AggregatorFactory;
|
||||
import org.apache.druid.query.aggregation.BufferAggregator;
|
||||
import org.apache.druid.query.cache.CacheKeyBuilder;
|
||||
import org.apache.druid.segment.ColumnSelectorFactory;
|
||||
import org.apache.druid.segment.ColumnValueSelector;
|
||||
import org.apache.druid.segment.column.ColumnType;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
public class HllMergeAggregatorFactory extends HllAggregatorFactory{
|
||||
public HllMergeAggregatorFactory(
|
||||
@JsonProperty("name") final String name,
|
||||
@JsonProperty("fieldName") final String fieldName,
|
||||
@JsonProperty("precision") @Nullable final Integer precision,
|
||||
@JsonProperty("round") @Nullable final Boolean round
|
||||
) {
|
||||
super(name, fieldName, precision, round);
|
||||
}
|
||||
|
||||
/*
|
||||
没这个方法了, 新版本需要实现getIntermediateType方法
|
||||
@Override
|
||||
public String getTypeName(){
|
||||
return HllModule.HLLD_TYPE_NAME;
|
||||
}*/
|
||||
|
||||
@Override
|
||||
public ColumnType getIntermediateType() {
|
||||
return HllModule.TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Aggregator factorize(ColumnSelectorFactory metricFactory) {
|
||||
final ColumnValueSelector<Hll> selector = metricFactory.makeColumnValueSelector(getFieldName());
|
||||
return new HllMergeAggregator(
|
||||
selector,
|
||||
precision
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public BufferAggregator factorizeBuffered(ColumnSelectorFactory columnSelectorFactory) {
|
||||
final ColumnValueSelector<Hll> selector = columnSelectorFactory.makeColumnValueSelector(getFieldName());
|
||||
return new HllMergeBufferAggregator(
|
||||
selector,
|
||||
precision
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AggregatorFactory withName(String newName) {
|
||||
return new HllMergeAggregatorFactory(newName, fieldName, precision, round);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getCacheKey() {
|
||||
return new CacheKeyBuilder(HllModule.CACHE_TYPE_ID_OFFSET).appendByte(HllModule.HLLD_MERGE_CACHE_TYPE_ID)
|
||||
.appendString(name).appendString(fieldName)
|
||||
.appendInt(precision).appendBoolean(round)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMaxIntermediateSize() {
|
||||
return HllUnion.getUpdatableSerializationBytes(precision);
|
||||
}
|
||||
}
|
||||
package org.apache.druid.query.aggregation.sketch.hlld;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.zdjz.galaxy.sketch.hlld.Hll;
|
||||
import com.zdjz.galaxy.sketch.hlld.HllUnion;
|
||||
import org.apache.druid.query.aggregation.Aggregator;
|
||||
import org.apache.druid.query.aggregation.AggregatorFactory;
|
||||
import org.apache.druid.query.aggregation.BufferAggregator;
|
||||
import org.apache.druid.query.cache.CacheKeyBuilder;
|
||||
import org.apache.druid.segment.ColumnSelectorFactory;
|
||||
import org.apache.druid.segment.ColumnValueSelector;
|
||||
import org.apache.druid.segment.column.ColumnType;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
public class HllMergeAggregatorFactory extends HllAggregatorFactory{
|
||||
public HllMergeAggregatorFactory(
|
||||
@JsonProperty("name") final String name,
|
||||
@JsonProperty("fieldName") final String fieldName,
|
||||
@JsonProperty("precision") @Nullable final Integer precision,
|
||||
@JsonProperty("round") @Nullable final Boolean round
|
||||
) {
|
||||
super(name, fieldName, precision, round);
|
||||
}
|
||||
|
||||
/*
|
||||
没这个方法了, 新版本需要实现getIntermediateType方法
|
||||
@Override
|
||||
public String getTypeName(){
|
||||
return HllModule.HLLD_TYPE_NAME;
|
||||
}*/
|
||||
|
||||
@Override
|
||||
public ColumnType getIntermediateType() {
|
||||
return HllModule.TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Aggregator factorize(ColumnSelectorFactory metricFactory) {
|
||||
final ColumnValueSelector<Hll> selector = metricFactory.makeColumnValueSelector(getFieldName());
|
||||
return new HllMergeAggregator(
|
||||
selector,
|
||||
precision
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public BufferAggregator factorizeBuffered(ColumnSelectorFactory columnSelectorFactory) {
|
||||
final ColumnValueSelector<Hll> selector = columnSelectorFactory.makeColumnValueSelector(getFieldName());
|
||||
return new HllMergeBufferAggregator(
|
||||
selector,
|
||||
precision
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AggregatorFactory withName(String newName) {
|
||||
return new HllMergeAggregatorFactory(newName, fieldName, precision, round);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getCacheKey() {
|
||||
return new CacheKeyBuilder(HllModule.CACHE_TYPE_ID_OFFSET).appendByte(HllModule.HLLD_MERGE_CACHE_TYPE_ID)
|
||||
.appendString(name).appendString(fieldName)
|
||||
.appendInt(precision).appendBoolean(round)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int getUpdatableSerializationBytes() {
|
||||
return HllUnion.getUpdatableSerializationBytes(precision);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,111 +1,114 @@
|
||||
package org.apache.druid.query.aggregation.sketch.hlld;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.zdjz.galaxy.sketch.hlld.Hll;
|
||||
import org.apache.druid.query.aggregation.AggregatorFactory;
|
||||
import org.apache.druid.query.aggregation.PostAggregator;
|
||||
import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
|
||||
import org.apache.druid.query.cache.CacheKeyBuilder;
|
||||
import org.apache.druid.segment.ColumnInspector;
|
||||
import org.apache.druid.segment.column.ColumnType;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
||||
public class HllToEstimatePostAggregator implements PostAggregator {
|
||||
private final String name;
|
||||
private final PostAggregator field;
|
||||
private final boolean round;
|
||||
|
||||
@JsonCreator
|
||||
public HllToEstimatePostAggregator(
|
||||
@JsonProperty("name") final String name,
|
||||
@JsonProperty("field") final PostAggregator field,
|
||||
@JsonProperty("round") boolean round
|
||||
) {
|
||||
this.name = name;
|
||||
this.field = field;
|
||||
this.round = round;
|
||||
}
|
||||
|
||||
// 新版本需要实现的方法
|
||||
@Override
|
||||
public ColumnType getType(ColumnInspector signature) {
|
||||
return round ? ColumnType.LONG : ColumnType.DOUBLE;
|
||||
}
|
||||
|
||||
@Override
|
||||
@JsonProperty
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public PostAggregator getField() {
|
||||
return field;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public boolean isRound() {
|
||||
return round;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getDependentFields() {
|
||||
return field.getDependentFields();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Comparator<Double> getComparator() {
|
||||
return ArithmeticPostAggregator.DEFAULT_COMPARATOR;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object compute(final Map<String, Object> combinedAggregators) {
|
||||
final Hll sketch = (Hll) field.compute(combinedAggregators);
|
||||
return round ? Math.round(sketch.size()) : sketch.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public PostAggregator decorate(final Map<String, AggregatorFactory> aggregators) {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "HllToEstimatePostAggregator{" +
|
||||
"name='" + name + '\'' +
|
||||
", field=" + field +
|
||||
", round=" + round +
|
||||
'}';
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(final Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (!(o instanceof HllToEstimatePostAggregator)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
final HllToEstimatePostAggregator that = (HllToEstimatePostAggregator) o;
|
||||
return name.equals(that.name) && field.equals(that.field) && round == that.round;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(name, field, round);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getCacheKey() {
|
||||
CacheKeyBuilder builder = new CacheKeyBuilder(HllModule.CACHE_TYPE_ID_OFFSET).appendByte(HllModule.HLLD_TO_ESTIMATE_CACHE_TYPE_ID)
|
||||
.appendCacheable(field).appendBoolean(round);
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
}
|
||||
package org.apache.druid.query.aggregation.sketch.hlld;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.zdjz.galaxy.sketch.hlld.Hll;
|
||||
import org.apache.druid.query.aggregation.AggregatorFactory;
|
||||
import org.apache.druid.query.aggregation.PostAggregator;
|
||||
import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
|
||||
import org.apache.druid.query.cache.CacheKeyBuilder;
|
||||
import org.apache.druid.segment.ColumnInspector;
|
||||
import org.apache.druid.segment.column.ColumnType;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
||||
public class HllToEstimatePostAggregator implements PostAggregator {
|
||||
private final String name;
|
||||
private final PostAggregator field;
|
||||
private final boolean round;
|
||||
|
||||
@JsonCreator
|
||||
public HllToEstimatePostAggregator(
|
||||
@JsonProperty("name") final String name,
|
||||
@JsonProperty("field") final PostAggregator field,
|
||||
@JsonProperty("round") boolean round
|
||||
) {
|
||||
this.name = name;
|
||||
this.field = field;
|
||||
this.round = round;
|
||||
}
|
||||
|
||||
// 新版本需要实现的方法
|
||||
@Override
|
||||
public ColumnType getType(ColumnInspector signature) {
|
||||
return round ? ColumnType.LONG : ColumnType.DOUBLE;
|
||||
}
|
||||
|
||||
@Override
|
||||
@JsonProperty
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public PostAggregator getField() {
|
||||
return field;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public boolean isRound() {
|
||||
return round;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getDependentFields() {
|
||||
return field.getDependentFields();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Comparator<Double> getComparator() {
|
||||
return ArithmeticPostAggregator.DEFAULT_COMPARATOR;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object compute(final Map<String, Object> combinedAggregators) {
|
||||
final Hll sketch = (Hll) field.compute(combinedAggregators);
|
||||
if(sketch == null){
|
||||
return round ? 0L: 0D;
|
||||
}
|
||||
return round ? Math.round(sketch.size()) : sketch.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public PostAggregator decorate(final Map<String, AggregatorFactory> aggregators) {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "HllToEstimatePostAggregator{" +
|
||||
"name='" + name + '\'' +
|
||||
", field=" + field +
|
||||
", round=" + round +
|
||||
'}';
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(final Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (!(o instanceof HllToEstimatePostAggregator)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
final HllToEstimatePostAggregator that = (HllToEstimatePostAggregator) o;
|
||||
return name.equals(that.name) && field.equals(that.field) && round == that.round;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(name, field, round);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getCacheKey() {
|
||||
CacheKeyBuilder builder = new CacheKeyBuilder(HllModule.CACHE_TYPE_ID_OFFSET).appendByte(HllModule.HLLD_TO_ESTIMATE_CACHE_TYPE_ID)
|
||||
.appendCacheable(field).appendBoolean(round);
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user