diff --git a/druid-udf/src/main/java/org/apache/druid/query/udf/UdfModule.java b/druid-udf/src/main/java/org/apache/druid/query/udf/UdfModule.java index d09e02a..51248c4 100644 --- a/druid-udf/src/main/java/org/apache/druid/query/udf/UdfModule.java +++ b/druid-udf/src/main/java/org/apache/druid/query/udf/UdfModule.java @@ -3,7 +3,9 @@ package org.apache.druid.query.udf; import com.google.inject.Binder; import org.apache.druid.guice.ExpressionModule; import org.apache.druid.initialization.DruidModule; +import org.apache.druid.query.udf.expressions.CurrentTimestampMillisExprMacro; import org.apache.druid.query.udf.expressions.DimensionBucketExprMacro; +import org.apache.druid.query.udf.sql.CurrentTimestampMillisOperatorConversion; import org.apache.druid.query.udf.sql.DimensionBucketOperatorConversion; import org.apache.druid.sql.guice.SqlBindings; @@ -11,7 +13,9 @@ public class UdfModule implements DruidModule { @Override public void configure(Binder binder) { SqlBindings.addOperatorConversion(binder, DimensionBucketOperatorConversion.class); + SqlBindings.addOperatorConversion(binder, CurrentTimestampMillisOperatorConversion.class); ExpressionModule.addExprMacro(binder, DimensionBucketExprMacro.class); + ExpressionModule.addExprMacro(binder, CurrentTimestampMillisExprMacro.class); } /*@Override diff --git a/druid-udf/src/main/java/org/apache/druid/query/udf/expressions/CurrentTimestampMillisExprMacro.java b/druid-udf/src/main/java/org/apache/druid/query/udf/expressions/CurrentTimestampMillisExprMacro.java new file mode 100644 index 0000000..9535223 --- /dev/null +++ b/druid-udf/src/main/java/org/apache/druid/query/udf/expressions/CurrentTimestampMillisExprMacro.java @@ -0,0 +1,57 @@ +package org.apache.druid.query.udf.expressions; + +import org.apache.druid.math.expr.*; + +import javax.annotation.Nullable; +import java.util.List; + +public class CurrentTimestampMillisExprMacro implements ExprMacroTable.ExprMacro { + private static final String NAME = "current_timestamp_millis"; // current_timestamp_millis + + @Override + public String name() { + return NAME; + } + + @Override + public Expr apply(List args) { + validationHelperCheckArgumentCount(args, 0); + + class CurrentTimestampMillisExpr implements Expr { + + @Override + public ExprEval eval(ObjectBinding bindings) { + return ExprEval.of(System.currentTimeMillis()); + } + + @Override + public String stringify() { + return "current_timestamp_millis"; + } + + @Override + public Expr visit(Shuttle shuttle) { + return shuttle.visit(this); + } + + @Override + public BindingAnalysis analyzeInputs() { + return BindingAnalysis.EMTPY; + } + + @Nullable + @Override + public ExpressionType getOutputType(InputBindingInspector inspector) { + return ExpressionType.LONG; + } + + @Override + public boolean canVectorize(InputBindingInspector inspector) { + return false; + } + } + + return new CurrentTimestampMillisExpr(); + } + +} diff --git a/druid-udf/src/main/java/org/apache/druid/query/udf/sql/CurrentTimestampMillisOperatorConversion.java b/druid-udf/src/main/java/org/apache/druid/query/udf/sql/CurrentTimestampMillisOperatorConversion.java new file mode 100644 index 0000000..f75f5c7 --- /dev/null +++ b/druid-udf/src/main/java/org/apache/druid/query/udf/sql/CurrentTimestampMillisOperatorConversion.java @@ -0,0 +1,35 @@ +package org.apache.druid.query.udf.sql; + +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.type.*; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.sql.calcite.expression.DruidExpression; +import org.apache.druid.sql.calcite.expression.OperatorConversions; +import org.apache.druid.sql.calcite.expression.SqlOperatorConversion; +import org.apache.druid.sql.calcite.planner.PlannerContext; + +import javax.annotation.Nullable; + +public class CurrentTimestampMillisOperatorConversion implements SqlOperatorConversion { + private static final SqlFunction SQL_FUNCTION = OperatorConversions + .operatorBuilder("CURRENT_TIMESTAMP_MILLIS") + .operandTypes(SqlTypeFamily.ANY) + .requiredOperands(0) + .returnTypeNonNull(SqlTypeName.BIGINT) + .functionCategory(SqlFunctionCategory.USER_DEFINED_FUNCTION) + .build(); + + @Override + public SqlOperator calciteOperator() { + return SQL_FUNCTION; + } + + @Nullable + @Override + public DruidExpression toDruidExpression(PlannerContext plannerContext, RowSignature rowSignature, RexNode rexNode) { + return OperatorConversions.convertDirectCall(plannerContext, rowSignature, rexNode, "current_timestamp_millis"); + } +} diff --git a/druid-udf/src/test/java/org/apache/druid/query/udf/expressions/CurrentTimestampMillisExprTest.java b/druid-udf/src/test/java/org/apache/druid/query/udf/expressions/CurrentTimestampMillisExprTest.java new file mode 100644 index 0000000..db83dd9 --- /dev/null +++ b/druid-udf/src/test/java/org/apache/druid/query/udf/expressions/CurrentTimestampMillisExprTest.java @@ -0,0 +1,148 @@ +package org.apache.druid.query.udf.expressions; + +import com.google.common.collect.ImmutableMap; +import org.apache.druid.math.expr.*; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.junit.Test; + +import java.util.Collections; + +public class CurrentTimestampMillisExprTest extends InitializedNullHandlingTest { + private final ExprMacroTable exprMacroTable = new ExprMacroTable(Collections.singletonList(new CurrentTimestampMillisExprMacro())); + Expr.ObjectBinding inputBindings = InputBindings.forInputSuppliers( + new ImmutableMap.Builder() + .put("string", InputBindings.inputSupplier(ExpressionType.STRING, () -> "abcdef")) + .put("long", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1234L)) + .put("double", InputBindings.inputSupplier(ExpressionType.DOUBLE, () -> 1.234)) + .put("array1", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> new Object[]{"1", "2", "3"})) + .put("array2", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> new String[]{"1", "2", "3"})) + .put("nullString", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("nullLong", InputBindings.inputSupplier(ExpressionType.LONG, () -> null)) + .put("nullDouble", InputBindings.inputSupplier(ExpressionType.DOUBLE, () -> null)) + .build() + ); + + Expr.ObjectBinding[] inputBindingArray = new Expr.ObjectBinding[]{ + InputBindings.forInputSuppliers( + new ImmutableMap.Builder() + .put("device_id", InputBindings.inputSupplier(ExpressionType.STRING, () -> "1")) + .put("rule_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 81)) + .put("template_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 81)) + .put("chart_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 81)) + .put("version", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1)) + .put("client_ip_object", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null)) + .put("server_ip_object", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null)) + .put("fqdn_category", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null)) + .put("client_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_fqdn", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_domain", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("application", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .build() + ), + InputBindings.forInputSuppliers( + new ImmutableMap.Builder() + .put("device_id", InputBindings.inputSupplier(ExpressionType.STRING, () -> "1")) + .put("rule_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 101)) + .put("template_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 101)) + .put("chart_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 101)) + .put("version", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1)) + .put("client_ip_object", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> new Object[]{"5","7","8"})) + .put("server_ip_object", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null)) + .put("fqdn_category", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null)) + .put("client_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_fqdn", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_domain", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("application", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .build() + ), + InputBindings.forInputSuppliers( + new ImmutableMap.Builder() + .put("device_id", InputBindings.inputSupplier(ExpressionType.STRING, () -> "1")) + .put("rule_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 271L)) + .put("template_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 271L)) + .put("chart_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 271L)) + .put("version", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1L)) + .put("client_ip_object", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null)) + .put("server_ip_object", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null)) + .put("fqdn_category", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null)) + .put("client_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> "5.245.228.51")) + .put("server_fqdn", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_domain", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("application", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .build() + ), + // ... + InputBindings.forInputSuppliers( + new ImmutableMap.Builder() + .put("device_id", InputBindings.inputSupplier(ExpressionType.STRING, () -> "1")) + .put("rule_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 81)) + .put("template_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 81)) + .put("chart_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 81)) + .put("version", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1)) + .put("client_ip_object", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_ip_object", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("fqdn_category", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("client_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_fqdn", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_domain", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("application", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .build() + ), + InputBindings.forInputSuppliers( + new ImmutableMap.Builder() + .put("device_id", InputBindings.inputSupplier(ExpressionType.STRING, () -> "1")) + .put("rule_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 101)) + .put("template_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 101)) + .put("chart_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 101)) + .put("version", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1)) + .put("client_ip_object", InputBindings.inputSupplier(ExpressionType.STRING, () -> "5,7,8")) + .put("server_ip_object", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("fqdn_category", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("client_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_fqdn", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_domain", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("application", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .build() + ), + InputBindings.forInputSuppliers( + new ImmutableMap.Builder() + .put("device_id", InputBindings.inputSupplier(ExpressionType.STRING, () -> "1")) + .put("rule_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 271L)) + .put("template_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 271L)) + .put("chart_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 271L)) + .put("version", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1L)) + .put("client_ip_object", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_ip_object", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("fqdn_category", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("client_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> "5.245.228.51")) + .put("server_fqdn", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("server_domain", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .put("application", InputBindings.inputSupplier(ExpressionType.STRING, () -> null)) + .build() + ), + }; + + @Test + public void test() throws Exception{ + Expr expr = Parser.parse("current_timestamp_millis()", exprMacroTable); + System.out.println(expr.analyzeInputs().getRequiredBindings()); + ExprEval eval = expr.eval(inputBindings); + System.out.println(eval.value()); + Thread.sleep(1000); + eval = expr.eval(inputBindings); + System.out.println(eval.value()); + Thread.sleep(1000); + expr = Parser.parse("current_timestamp_millis()", exprMacroTable); + eval = expr.eval(inputBindings); + System.out.println(eval.value()); + + } + + +}