From 8c546e20d76eff7474043242a678e8fa8780e11b Mon Sep 17 00:00:00 2001 From: lifengchao Date: Fri, 9 Aug 2024 11:30:47 +0800 Subject: [PATCH] =?UTF-8?q?TSG-22013=20=E6=B7=BB=E5=8A=A0dimension=5Fbucke?= =?UTF-8?q?t=E5=87=BD=E6=95=B0=EF=BC=8C=E8=AE=A1=E7=AE=97=E7=BB=B4?= =?UTF-8?q?=E5=BA=A6bucket?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- druid-udf/pom.xml | 143 +++++++++++++++++ druid-udf/src/assembly/assembly.xml | 54 +++++++ .../org/apache/druid/query/udf/UdfModule.java | 23 +++ .../expressions/DimensionBucketExprMacro.java | 82 ++++++++++ .../query/udf/expressions/IntToHexUtil.java | 45 ++++++ .../DimensionBucketOperatorConversion.java | 43 ++++++ ...rg.apache.druid.initialization.DruidModule | 1 + .../expressions/DimensionBucketExprTest.java | 146 ++++++++++++++++++ 8 files changed, 537 insertions(+) create mode 100644 druid-udf/pom.xml create mode 100644 druid-udf/src/assembly/assembly.xml create mode 100644 druid-udf/src/main/java/org/apache/druid/query/udf/UdfModule.java create mode 100644 druid-udf/src/main/java/org/apache/druid/query/udf/expressions/DimensionBucketExprMacro.java create mode 100644 druid-udf/src/main/java/org/apache/druid/query/udf/expressions/IntToHexUtil.java create mode 100644 druid-udf/src/main/java/org/apache/druid/query/udf/sql/DimensionBucketOperatorConversion.java create mode 100644 druid-udf/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule create mode 100644 druid-udf/src/test/java/org/apache/druid/query/udf/expressions/DimensionBucketExprTest.java diff --git a/druid-udf/pom.xml b/druid-udf/pom.xml new file mode 100644 index 0000000..07b2f76 --- /dev/null +++ b/druid-udf/pom.xml @@ -0,0 +1,143 @@ + + + 4.0.0 + + org.example + druid-udf_26.0.0 + druid-udf + 1.0-SNAPSHOT + + + 11 + 11 + UTF-8 + 26.0.0 + + + + + org.apache.druid + druid-server + ${druid.version} + provided + + + + org.apache.druid + druid-sql + ${druid.version} + provided + + + + + + org.easymock + easymock + 4.3 + test + + + + org.apache.druid + druid-processing + ${druid.version} + test-jar + test + + + org.apache.druid + druid-server + ${druid.version} + test + test-jar + + + org.apache.druid + druid-sql + ${druid.version} + test-jar + test + + + junit + junit + 4.12 + test + + + com.alibaba.fastjson2 + fastjson2 + 2.0.34 + test + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + -Xlint:unchecked + 11 + 11 + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.19.1 + + -Duser.timezone=UTC + true + + + + org.apache.maven.plugins + maven-assembly-plugin + 2.5.5 + + + distro-assembly + package + + single + + + ${project.artifactId}-${project.version} + posix + + src/assembly/assembly.xml + + + + + + + maven-release-plugin + 2.5.3 + + + org.apache.maven.scm + maven-scm-provider-gitexe + 1.9.4 + + + + + org.apache.maven.plugins + maven-jar-plugin + 3.0.2 + + + false + + + + + + \ No newline at end of file diff --git a/druid-udf/src/assembly/assembly.xml b/druid-udf/src/assembly/assembly.xml new file mode 100644 index 0000000..8fb4519 --- /dev/null +++ b/druid-udf/src/assembly/assembly.xml @@ -0,0 +1,54 @@ + + + + + bin + + tar.gz + + + ${project.name} + + + + false + true + . + false + + + + + + . + + + README.md + LICENSE + + + + ${project.build.directory} + . + + *.jar + + + + \ No newline at end of file diff --git a/druid-udf/src/main/java/org/apache/druid/query/udf/UdfModule.java b/druid-udf/src/main/java/org/apache/druid/query/udf/UdfModule.java new file mode 100644 index 0000000..d09e02a --- /dev/null +++ b/druid-udf/src/main/java/org/apache/druid/query/udf/UdfModule.java @@ -0,0 +1,23 @@ +package org.apache.druid.query.udf; + +import com.google.inject.Binder; +import org.apache.druid.guice.ExpressionModule; +import org.apache.druid.initialization.DruidModule; +import org.apache.druid.query.udf.expressions.DimensionBucketExprMacro; +import org.apache.druid.query.udf.sql.DimensionBucketOperatorConversion; +import org.apache.druid.sql.guice.SqlBindings; + +public class UdfModule implements DruidModule { + @Override + public void configure(Binder binder) { + SqlBindings.addOperatorConversion(binder, DimensionBucketOperatorConversion.class); + ExpressionModule.addExprMacro(binder, DimensionBucketExprMacro.class); + } + + /*@Override + public List getJacksonModules() { + // Register Jackson module for any classes we need to be able to use in JSON queries or ingestion specs. + return Collections.singletonList(new SimpleModule("UdfModule")); + }*/ + +} diff --git a/druid-udf/src/main/java/org/apache/druid/query/udf/expressions/DimensionBucketExprMacro.java b/druid-udf/src/main/java/org/apache/druid/query/udf/expressions/DimensionBucketExprMacro.java new file mode 100644 index 0000000..da8c6b6 --- /dev/null +++ b/druid-udf/src/main/java/org/apache/druid/query/udf/expressions/DimensionBucketExprMacro.java @@ -0,0 +1,82 @@ +package org.apache.druid.query.udf.expressions; + +import org.apache.druid.math.expr.*; +import org.apache.druid.math.expr.ExprMacroTable.ExprMacro; + +import javax.annotation.Nullable; +import java.util.List; +import java.util.stream.Collectors; + +public class DimensionBucketExprMacro implements ExprMacro { + private static final String NAME = "dimension_bucket"; + + @Override + public String name() { + return NAME; + } + + @Override + public Expr apply(List args) { + validationHelperCheckMinArgumentCount(args, 2); + Expr bucketCnt = args.get(0); + if(!bucketCnt.isLiteral()|| bucketCnt.eval(InputBindings.nilBindings()).asInt() <= 0) { + throw validationFailed("first bucketCount argument must is int literal and > 0"); + } + return new DimensionBucketExpr(args); + } + + static class DimensionBucketExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr { + private final int bucketCount; + + public DimensionBucketExpr(List args) { + super(NAME, args); + bucketCount = args.get(0).eval(InputBindings.nilBindings()).asInt(); + } + + @Override + public ExprEval eval(ObjectBinding bindings) { + int result = 1; + for (int i = 1; i < args.size(); i++) { + ExprEval eval = args.get(i).eval(bindings); + Object element = eval.value(); + if(element instanceof Object[]){ + for (Object ele : (Object[]) element) { + result = 31 * result + (ele == null ? 0 : ele.hashCode()); + } + }else{ + result = 31 * result + (element == null ? 0 : element.hashCode()); + } + + /*else if (element instanceof Number) { + //result = 31 * result + Integer.hashCode(((Number)element).intValue()); + result = 31 * result + Long.hashCode(((Number)element).longValue()); + }*/ + } + + int bucket = Math.abs(result) % bucketCount; + return ExprEval.of(IntToHexUtil.uInt16ToHexStringFast(bucket)); + } + + @Override + public Expr visit(Shuttle shuttle) { + List newArgs = args.stream().map(x -> x.visit(shuttle)).collect(Collectors.toList()); + return shuttle.visit(new DimensionBucketExpr(newArgs)); + } + + @Override + public BindingAnalysis analyzeInputs() { + return super.analyzeInputs(); + } + + @Nullable + @Override + public ExpressionType getOutputType(InputBindingInspector inspector) { + return ExpressionType.STRING; + } + + @Override + public boolean canVectorize(InputBindingInspector inspector) { + return false; + } + } +} diff --git a/druid-udf/src/main/java/org/apache/druid/query/udf/expressions/IntToHexUtil.java b/druid-udf/src/main/java/org/apache/druid/query/udf/expressions/IntToHexUtil.java new file mode 100644 index 0000000..cb133e5 --- /dev/null +++ b/druid-udf/src/main/java/org/apache/druid/query/udf/expressions/IntToHexUtil.java @@ -0,0 +1,45 @@ +package org.apache.druid.query.udf.expressions; + +import java.nio.charset.StandardCharsets; + +public class IntToHexUtil { + static final byte[] digits = { + '0' , '1' , '2' , '3' , '4' , '5' , + '6' , '7' , '8' , '9' , 'a' , 'b' , + 'c' , 'd' , 'e' , 'f' , 'g' , 'h' , + 'i' , 'j' , 'k' , 'l' , 'm' , 'n' , + 'o' , 'p' , 'q' , 'r' , 's' , 't' , + 'u' , 'v' , 'w' , 'x' , 'y' , 'z' + }; + static final String[] uInt16HexsCache; + static final int uInt16HexsCacheSize = 8192; + + static{ + uInt16HexsCache = new String[uInt16HexsCacheSize]; + for (int i = 0; i < uInt16HexsCacheSize; i++) { + uInt16HexsCache[i] = uInt16ToHexString(i); + } + } + + public static String uInt16ToHexStringFast(int i){ + if(i < uInt16HexsCacheSize){ + return uInt16HexsCache[i]; + }else{ + return uInt16ToHexString(i); + } + } + + private static String uInt16ToHexString(int i){ + byte[] bytes = new byte[4]; + int mask = 15; // 16 - 1 + int value = i; + bytes[3] = digits[value & mask]; + value >>>= 4; + bytes[2] = digits[value & mask]; + value >>>= 4; + bytes[1] = digits[value & mask]; + value >>>= 4; + bytes[0] = digits[value & mask]; + return new String(bytes, StandardCharsets.US_ASCII); + } +} diff --git a/druid-udf/src/main/java/org/apache/druid/query/udf/sql/DimensionBucketOperatorConversion.java b/druid-udf/src/main/java/org/apache/druid/query/udf/sql/DimensionBucketOperatorConversion.java new file mode 100644 index 0000000..2aff9cb --- /dev/null +++ b/druid-udf/src/main/java/org/apache/druid/query/udf/sql/DimensionBucketOperatorConversion.java @@ -0,0 +1,43 @@ +package org.apache.druid.query.udf.sql; + +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.type.OperandTypes; +import org.apache.calcite.sql.type.ReturnTypes; +import org.apache.calcite.sql.type.SqlOperandCountRanges; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.sql.calcite.expression.DruidExpression; +import org.apache.druid.sql.calcite.expression.OperatorConversions; +import org.apache.druid.sql.calcite.expression.SqlOperatorConversion; +import org.apache.druid.sql.calcite.planner.Calcites; +import org.apache.druid.sql.calcite.planner.PlannerContext; + +import javax.annotation.Nullable; + +public class DimensionBucketOperatorConversion implements SqlOperatorConversion { + private static final SqlFunction SQL_FUNCTION = new SqlFunction( + "DIMENSION_BUCKET", + SqlKind.OTHER_FUNCTION, + ReturnTypes.explicit( + factory -> Calcites.createSqlTypeWithNullability(factory, SqlTypeName.VARCHAR, true) + ), + null, + OperandTypes.variadic(SqlOperandCountRanges.from(2)), + SqlFunctionCategory.USER_DEFINED_FUNCTION + ); + + @Override + public SqlOperator calciteOperator() { + return SQL_FUNCTION; + } + + @Nullable + @Override + public DruidExpression toDruidExpression(PlannerContext plannerContext, RowSignature rowSignature, RexNode rexNode) { + return OperatorConversions.convertDirectCall(plannerContext, rowSignature, rexNode, "dimension_bucket"); + } +} diff --git a/druid-udf/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule b/druid-udf/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule new file mode 100644 index 0000000..ab6de6b --- /dev/null +++ b/druid-udf/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule @@ -0,0 +1 @@ +org.apache.druid.query.udf.UdfModule \ No newline at end of file diff --git a/druid-udf/src/test/java/org/apache/druid/query/udf/expressions/DimensionBucketExprTest.java b/druid-udf/src/test/java/org/apache/druid/query/udf/expressions/DimensionBucketExprTest.java new file mode 100644 index 0000000..51d2ce0 --- /dev/null +++ b/druid-udf/src/test/java/org/apache/druid/query/udf/expressions/DimensionBucketExprTest.java @@ -0,0 +1,146 @@ +package org.apache.druid.query.udf.expressions; + +import com.google.common.collect.ImmutableMap; +import org.apache.druid.math.expr.*; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.junit.Test; + +import java.util.Collections; + +public class DimensionBucketExprTest extends InitializedNullHandlingTest { + private final ExprMacroTable exprMacroTable = new ExprMacroTable(Collections.singletonList(new DimensionBucketExprMacro())); + Expr.ObjectBinding inputBindings = InputBindings.forInputSuppliers( + new ImmutableMap.Builder() + .put("string", InputBindings.inputSupplier(ExpressionType.STRING, () -> "abcdef")) + .put("long", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1234L)) + .put("double", InputBindings.inputSupplier(ExpressionType.DOUBLE, () -> 1.234)) + .put("array1", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> new Object[]{"1", "2", "3"})) + .put("array2", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> new String[]{"1", "2", "3"})) + .put("nullString", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("nullLong", InputBindings.inputSupplier(ExpressionType.LONG, () -> null)) + .put("nullDouble", InputBindings.inputSupplier(ExpressionType.DOUBLE, () -> null)) + .build() + ); + + Expr.ObjectBinding[] inputBindingArray = new Expr.ObjectBinding[]{ + InputBindings.forInputSuppliers( + new ImmutableMap.Builder() + .put("device_id", InputBindings.inputSupplier(ExpressionType.STRING, () -> "1")) + .put("rule_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 81)) + .put("template_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 81)) + .put("chart_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 81)) + .put("version", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1)) + .put("client_ip_object", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null)) + .put("server_ip_object", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null)) + .put("fqdn_category", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null)) + .put("client_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_fqdn", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_domain", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("application", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .build() + ), + InputBindings.forInputSuppliers( + new ImmutableMap.Builder() + .put("device_id", InputBindings.inputSupplier(ExpressionType.STRING, () -> "1")) + .put("rule_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 101)) + .put("template_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 101)) + .put("chart_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 101)) + .put("version", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1)) + .put("client_ip_object", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> new Object[]{"5","7","8"})) + .put("server_ip_object", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null)) + .put("fqdn_category", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null)) + .put("client_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_fqdn", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_domain", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("application", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .build() + ), + InputBindings.forInputSuppliers( + new ImmutableMap.Builder() + .put("device_id", InputBindings.inputSupplier(ExpressionType.STRING, () -> "1")) + .put("rule_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 271L)) + .put("template_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 271L)) + .put("chart_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 271L)) + .put("version", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1L)) + .put("client_ip_object", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null)) + .put("server_ip_object", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null)) + .put("fqdn_category", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null)) + .put("client_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> "5.245.228.51")) + .put("server_fqdn", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_domain", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("application", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .build() + ), + // ... + InputBindings.forInputSuppliers( + new ImmutableMap.Builder() + .put("device_id", InputBindings.inputSupplier(ExpressionType.STRING, () -> "1")) + .put("rule_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 81)) + .put("template_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 81)) + .put("chart_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 81)) + .put("version", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1)) + .put("client_ip_object", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_ip_object", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("fqdn_category", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("client_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_fqdn", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_domain", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("application", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .build() + ), + InputBindings.forInputSuppliers( + new ImmutableMap.Builder() + .put("device_id", InputBindings.inputSupplier(ExpressionType.STRING, () -> "1")) + .put("rule_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 101)) + .put("template_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 101)) + .put("chart_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 101)) + .put("version", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1)) + .put("client_ip_object", InputBindings.inputSupplier(ExpressionType.STRING, () -> "5,7,8")) + .put("server_ip_object", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("fqdn_category", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("client_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_fqdn", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_domain", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("application", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .build() + ), + InputBindings.forInputSuppliers( + new ImmutableMap.Builder() + .put("device_id", InputBindings.inputSupplier(ExpressionType.STRING, () -> "1")) + .put("rule_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 271L)) + .put("template_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 271L)) + .put("chart_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 271L)) + .put("version", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1L)) + .put("client_ip_object", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_ip_object", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("fqdn_category", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("client_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> "5.245.228.51")) + .put("server_fqdn", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_domain", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("application", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .build() + ), + }; + + @Test + public void test() { + Expr expr = Parser.parse("dimension_bucket(1024, 100, 'aaa', string,long,double,array1, array2, nullString, nullLong)", exprMacroTable); + ExprEval eval = expr.eval(inputBindings); + System.out.println(eval.value()); + } + + @Test + public void test2() { + for (Expr.ObjectBinding objectBinding : inputBindingArray) { + Expr expr = Parser.parse("dimension_bucket(1024, device_id, rule_id, template_id, chart_id, version, client_ip_object, server_ip_object, fqdn_category, client_ip, server_ip, server_fqdn, server_domain, application)", exprMacroTable); + ExprEval eval = expr.eval(objectBinding); + System.out.println(objectBinding.get("rule_id") + ", bucket_id:" + eval.value()); + } + } +}