From 5765edf67121f9291cebb9bf173d1627e7c7e9ca Mon Sep 17 00:00:00 2001 From: lifengchao Date: Thu, 17 Oct 2024 18:26:38 +0800 Subject: [PATCH] =?UTF-8?q?TSG-22756=20=E6=B7=BB=E5=8A=A0current=5Ftimesta?= =?UTF-8?q?mp=5Fmillis=20udf=E8=BE=93=E5=87=BA=E5=BD=93=E5=89=8D=E6=97=B6?= =?UTF-8?q?=E9=97=B4=E6=88=B3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/apache/druid/query/udf/UdfModule.java | 4 + .../CurrentTimestampMillisExprMacro.java | 57 +++++++ ...rentTimestampMillisOperatorConversion.java | 35 +++++ .../CurrentTimestampMillisExprTest.java | 148 ++++++++++++++++++ 4 files changed, 244 insertions(+) create mode 100644 druid-udf/src/main/java/org/apache/druid/query/udf/expressions/CurrentTimestampMillisExprMacro.java create mode 100644 druid-udf/src/main/java/org/apache/druid/query/udf/sql/CurrentTimestampMillisOperatorConversion.java create mode 100644 druid-udf/src/test/java/org/apache/druid/query/udf/expressions/CurrentTimestampMillisExprTest.java diff --git a/druid-udf/src/main/java/org/apache/druid/query/udf/UdfModule.java b/druid-udf/src/main/java/org/apache/druid/query/udf/UdfModule.java index d09e02a..51248c4 100644 --- a/druid-udf/src/main/java/org/apache/druid/query/udf/UdfModule.java +++ b/druid-udf/src/main/java/org/apache/druid/query/udf/UdfModule.java @@ -3,7 +3,9 @@ package org.apache.druid.query.udf; import com.google.inject.Binder; import org.apache.druid.guice.ExpressionModule; import org.apache.druid.initialization.DruidModule; +import org.apache.druid.query.udf.expressions.CurrentTimestampMillisExprMacro; import org.apache.druid.query.udf.expressions.DimensionBucketExprMacro; +import org.apache.druid.query.udf.sql.CurrentTimestampMillisOperatorConversion; import org.apache.druid.query.udf.sql.DimensionBucketOperatorConversion; import org.apache.druid.sql.guice.SqlBindings; @@ -11,7 +13,9 @@ public class UdfModule implements DruidModule { @Override public void configure(Binder binder) { SqlBindings.addOperatorConversion(binder, DimensionBucketOperatorConversion.class); + SqlBindings.addOperatorConversion(binder, CurrentTimestampMillisOperatorConversion.class); ExpressionModule.addExprMacro(binder, DimensionBucketExprMacro.class); + ExpressionModule.addExprMacro(binder, CurrentTimestampMillisExprMacro.class); } /*@Override diff --git a/druid-udf/src/main/java/org/apache/druid/query/udf/expressions/CurrentTimestampMillisExprMacro.java b/druid-udf/src/main/java/org/apache/druid/query/udf/expressions/CurrentTimestampMillisExprMacro.java new file mode 100644 index 0000000..9535223 --- /dev/null +++ b/druid-udf/src/main/java/org/apache/druid/query/udf/expressions/CurrentTimestampMillisExprMacro.java @@ -0,0 +1,57 @@ +package org.apache.druid.query.udf.expressions; + +import org.apache.druid.math.expr.*; + +import javax.annotation.Nullable; +import java.util.List; + +public class CurrentTimestampMillisExprMacro implements ExprMacroTable.ExprMacro { + private static final String NAME = "current_timestamp_millis"; // current_timestamp_millis + + @Override + public String name() { + return NAME; + } + + @Override + public Expr apply(List args) { + validationHelperCheckArgumentCount(args, 0); + + class CurrentTimestampMillisExpr implements Expr { + + @Override + public ExprEval eval(ObjectBinding bindings) { + return ExprEval.of(System.currentTimeMillis()); + } + + @Override + public String stringify() { + return "current_timestamp_millis"; + } + + @Override + public Expr visit(Shuttle shuttle) { + return shuttle.visit(this); + } + + @Override + public BindingAnalysis analyzeInputs() { + return BindingAnalysis.EMTPY; + } + + @Nullable + @Override + public ExpressionType getOutputType(InputBindingInspector inspector) { + return ExpressionType.LONG; + } + + @Override + public boolean canVectorize(InputBindingInspector inspector) { + return false; + } + } + + return new CurrentTimestampMillisExpr(); + } + +} diff --git a/druid-udf/src/main/java/org/apache/druid/query/udf/sql/CurrentTimestampMillisOperatorConversion.java b/druid-udf/src/main/java/org/apache/druid/query/udf/sql/CurrentTimestampMillisOperatorConversion.java new file mode 100644 index 0000000..f75f5c7 --- /dev/null +++ b/druid-udf/src/main/java/org/apache/druid/query/udf/sql/CurrentTimestampMillisOperatorConversion.java @@ -0,0 +1,35 @@ +package org.apache.druid.query.udf.sql; + +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.type.*; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.sql.calcite.expression.DruidExpression; +import org.apache.druid.sql.calcite.expression.OperatorConversions; +import org.apache.druid.sql.calcite.expression.SqlOperatorConversion; +import org.apache.druid.sql.calcite.planner.PlannerContext; + +import javax.annotation.Nullable; + +public class CurrentTimestampMillisOperatorConversion implements SqlOperatorConversion { + private static final SqlFunction SQL_FUNCTION = OperatorConversions + .operatorBuilder("CURRENT_TIMESTAMP_MILLIS") + .operandTypes(SqlTypeFamily.ANY) + .requiredOperands(0) + .returnTypeNonNull(SqlTypeName.BIGINT) + .functionCategory(SqlFunctionCategory.USER_DEFINED_FUNCTION) + .build(); + + @Override + public SqlOperator calciteOperator() { + return SQL_FUNCTION; + } + + @Nullable + @Override + public DruidExpression toDruidExpression(PlannerContext plannerContext, RowSignature rowSignature, RexNode rexNode) { + return OperatorConversions.convertDirectCall(plannerContext, rowSignature, rexNode, "current_timestamp_millis"); + } +} diff --git a/druid-udf/src/test/java/org/apache/druid/query/udf/expressions/CurrentTimestampMillisExprTest.java b/druid-udf/src/test/java/org/apache/druid/query/udf/expressions/CurrentTimestampMillisExprTest.java new file mode 100644 index 0000000..db83dd9 --- /dev/null +++ b/druid-udf/src/test/java/org/apache/druid/query/udf/expressions/CurrentTimestampMillisExprTest.java @@ -0,0 +1,148 @@ +package org.apache.druid.query.udf.expressions; + +import com.google.common.collect.ImmutableMap; +import org.apache.druid.math.expr.*; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.junit.Test; + +import java.util.Collections; + +public class CurrentTimestampMillisExprTest extends InitializedNullHandlingTest { + private final ExprMacroTable exprMacroTable = new ExprMacroTable(Collections.singletonList(new CurrentTimestampMillisExprMacro())); + Expr.ObjectBinding inputBindings = InputBindings.forInputSuppliers( + new ImmutableMap.Builder() + .put("string", InputBindings.inputSupplier(ExpressionType.STRING, () -> "abcdef")) + .put("long", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1234L)) + .put("double", InputBindings.inputSupplier(ExpressionType.DOUBLE, () -> 1.234)) + .put("array1", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> new Object[]{"1", "2", "3"})) + .put("array2", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> new String[]{"1", "2", "3"})) + .put("nullString", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("nullLong", InputBindings.inputSupplier(ExpressionType.LONG, () -> null)) + .put("nullDouble", InputBindings.inputSupplier(ExpressionType.DOUBLE, () -> null)) + .build() + ); + + Expr.ObjectBinding[] inputBindingArray = new Expr.ObjectBinding[]{ + InputBindings.forInputSuppliers( + new ImmutableMap.Builder() + .put("device_id", InputBindings.inputSupplier(ExpressionType.STRING, () -> "1")) + .put("rule_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 81)) + .put("template_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 81)) + .put("chart_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 81)) + .put("version", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1)) + .put("client_ip_object", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null)) + .put("server_ip_object", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null)) + .put("fqdn_category", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null)) + .put("client_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_fqdn", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_domain", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("application", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .build() + ), + InputBindings.forInputSuppliers( + new ImmutableMap.Builder() + .put("device_id", InputBindings.inputSupplier(ExpressionType.STRING, () -> "1")) + .put("rule_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 101)) + .put("template_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 101)) + .put("chart_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 101)) + .put("version", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1)) + .put("client_ip_object", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> new Object[]{"5","7","8"})) + .put("server_ip_object", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null)) + .put("fqdn_category", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null)) + .put("client_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_fqdn", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_domain", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("application", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .build() + ), + InputBindings.forInputSuppliers( + new ImmutableMap.Builder() + .put("device_id", InputBindings.inputSupplier(ExpressionType.STRING, () -> "1")) + .put("rule_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 271L)) + .put("template_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 271L)) + .put("chart_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 271L)) + .put("version", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1L)) + .put("client_ip_object", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null)) + .put("server_ip_object", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null)) + .put("fqdn_category", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null)) + .put("client_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> "5.245.228.51")) + .put("server_fqdn", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_domain", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("application", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .build() + ), + // ... + InputBindings.forInputSuppliers( + new ImmutableMap.Builder() + .put("device_id", InputBindings.inputSupplier(ExpressionType.STRING, () -> "1")) + .put("rule_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 81)) + .put("template_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 81)) + .put("chart_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 81)) + .put("version", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1)) + .put("client_ip_object", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_ip_object", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("fqdn_category", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("client_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_fqdn", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_domain", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("application", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .build() + ), + InputBindings.forInputSuppliers( + new ImmutableMap.Builder() + .put("device_id", InputBindings.inputSupplier(ExpressionType.STRING, () -> "1")) + .put("rule_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 101)) + .put("template_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 101)) + .put("chart_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 101)) + .put("version", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1)) + .put("client_ip_object", InputBindings.inputSupplier(ExpressionType.STRING, () -> "5,7,8")) + .put("server_ip_object", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("fqdn_category", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("client_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_fqdn", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_domain", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("application", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .build() + ), + InputBindings.forInputSuppliers( + new ImmutableMap.Builder() + .put("device_id", InputBindings.inputSupplier(ExpressionType.STRING, () -> "1")) + .put("rule_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 271L)) + .put("template_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 271L)) + .put("chart_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 271L)) + .put("version", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1L)) + .put("client_ip_object", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_ip_object", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("fqdn_category", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("client_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> "5.245.228.51")) + .put("server_fqdn", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_domain", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("application", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .build() + ), + }; + + @Test + public void test() throws Exception{ + Expr expr = Parser.parse("current_timestamp_millis()", exprMacroTable); + System.out.println(expr.analyzeInputs().getRequiredBindings()); + ExprEval eval = expr.eval(inputBindings); + System.out.println(eval.value()); + Thread.sleep(1000); + eval = expr.eval(inputBindings); + System.out.println(eval.value()); + Thread.sleep(1000); + expr = Parser.parse("current_timestamp_millis()", exprMacroTable); + eval = expr.eval(inputBindings); + System.out.println(eval.value()); + + } + + +}