TSG-22756 添加current_timestamp_millis udf输出当前时间戳

This commit is contained in:
lifengchao
2024-10-17 18:26:38 +08:00
parent b754e83ba0
commit 5765edf671
4 changed files with 244 additions and 0 deletions

View File

@@ -3,7 +3,9 @@ package org.apache.druid.query.udf;
import com.google.inject.Binder; import com.google.inject.Binder;
import org.apache.druid.guice.ExpressionModule; import org.apache.druid.guice.ExpressionModule;
import org.apache.druid.initialization.DruidModule; import org.apache.druid.initialization.DruidModule;
import org.apache.druid.query.udf.expressions.CurrentTimestampMillisExprMacro;
import org.apache.druid.query.udf.expressions.DimensionBucketExprMacro; import org.apache.druid.query.udf.expressions.DimensionBucketExprMacro;
import org.apache.druid.query.udf.sql.CurrentTimestampMillisOperatorConversion;
import org.apache.druid.query.udf.sql.DimensionBucketOperatorConversion; import org.apache.druid.query.udf.sql.DimensionBucketOperatorConversion;
import org.apache.druid.sql.guice.SqlBindings; import org.apache.druid.sql.guice.SqlBindings;
@@ -11,7 +13,9 @@ public class UdfModule implements DruidModule {
@Override @Override
public void configure(Binder binder) { public void configure(Binder binder) {
SqlBindings.addOperatorConversion(binder, DimensionBucketOperatorConversion.class); SqlBindings.addOperatorConversion(binder, DimensionBucketOperatorConversion.class);
SqlBindings.addOperatorConversion(binder, CurrentTimestampMillisOperatorConversion.class);
ExpressionModule.addExprMacro(binder, DimensionBucketExprMacro.class); ExpressionModule.addExprMacro(binder, DimensionBucketExprMacro.class);
ExpressionModule.addExprMacro(binder, CurrentTimestampMillisExprMacro.class);
} }
/*@Override /*@Override

View File

@@ -0,0 +1,57 @@
package org.apache.druid.query.udf.expressions;
import org.apache.druid.math.expr.*;
import javax.annotation.Nullable;
import java.util.List;
public class CurrentTimestampMillisExprMacro implements ExprMacroTable.ExprMacro {
private static final String NAME = "current_timestamp_millis"; // current_timestamp_millis
@Override
public String name() {
return NAME;
}
@Override
public Expr apply(List<Expr> args) {
validationHelperCheckArgumentCount(args, 0);
class CurrentTimestampMillisExpr implements Expr {
@Override
public ExprEval eval(ObjectBinding bindings) {
return ExprEval.of(System.currentTimeMillis());
}
@Override
public String stringify() {
return "current_timestamp_millis";
}
@Override
public Expr visit(Shuttle shuttle) {
return shuttle.visit(this);
}
@Override
public BindingAnalysis analyzeInputs() {
return BindingAnalysis.EMTPY;
}
@Nullable
@Override
public ExpressionType getOutputType(InputBindingInspector inspector) {
return ExpressionType.LONG;
}
@Override
public boolean canVectorize(InputBindingInspector inspector) {
return false;
}
}
return new CurrentTimestampMillisExpr();
}
}

View File

@@ -0,0 +1,35 @@
package org.apache.druid.query.udf.sql;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.type.*;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.OperatorConversions;
import org.apache.druid.sql.calcite.expression.SqlOperatorConversion;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import javax.annotation.Nullable;
public class CurrentTimestampMillisOperatorConversion implements SqlOperatorConversion {
private static final SqlFunction SQL_FUNCTION = OperatorConversions
.operatorBuilder("CURRENT_TIMESTAMP_MILLIS")
.operandTypes(SqlTypeFamily.ANY)
.requiredOperands(0)
.returnTypeNonNull(SqlTypeName.BIGINT)
.functionCategory(SqlFunctionCategory.USER_DEFINED_FUNCTION)
.build();
@Override
public SqlOperator calciteOperator() {
return SQL_FUNCTION;
}
@Nullable
@Override
public DruidExpression toDruidExpression(PlannerContext plannerContext, RowSignature rowSignature, RexNode rexNode) {
return OperatorConversions.convertDirectCall(plannerContext, rowSignature, rexNode, "current_timestamp_millis");
}
}

View File

@@ -0,0 +1,148 @@
package org.apache.druid.query.udf.expressions;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.math.expr.*;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.Test;
import java.util.Collections;
public class CurrentTimestampMillisExprTest extends InitializedNullHandlingTest {
private final ExprMacroTable exprMacroTable = new ExprMacroTable(Collections.singletonList(new CurrentTimestampMillisExprMacro()));
Expr.ObjectBinding inputBindings = InputBindings.forInputSuppliers(
new ImmutableMap.Builder<String, InputBindings.InputSupplier>()
.put("string", InputBindings.inputSupplier(ExpressionType.STRING, () -> "abcdef"))
.put("long", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1234L))
.put("double", InputBindings.inputSupplier(ExpressionType.DOUBLE, () -> 1.234))
.put("array1", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> new Object[]{"1", "2", "3"}))
.put("array2", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> new String[]{"1", "2", "3"}))
.put("nullString", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
.put("nullLong", InputBindings.inputSupplier(ExpressionType.LONG, () -> null))
.put("nullDouble", InputBindings.inputSupplier(ExpressionType.DOUBLE, () -> null))
.build()
);
Expr.ObjectBinding[] inputBindingArray = new Expr.ObjectBinding[]{
InputBindings.forInputSuppliers(
new ImmutableMap.Builder<String, InputBindings.InputSupplier>()
.put("device_id", InputBindings.inputSupplier(ExpressionType.STRING, () -> "1"))
.put("rule_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 81))
.put("template_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 81))
.put("chart_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 81))
.put("version", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1))
.put("client_ip_object", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null))
.put("server_ip_object", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null))
.put("fqdn_category", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null))
.put("client_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
.put("server_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
.put("server_fqdn", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
.put("server_domain", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
.put("application", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
.build()
),
InputBindings.forInputSuppliers(
new ImmutableMap.Builder<String, InputBindings.InputSupplier>()
.put("device_id", InputBindings.inputSupplier(ExpressionType.STRING, () -> "1"))
.put("rule_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 101))
.put("template_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 101))
.put("chart_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 101))
.put("version", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1))
.put("client_ip_object", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> new Object[]{"5","7","8"}))
.put("server_ip_object", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null))
.put("fqdn_category", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null))
.put("client_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
.put("server_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
.put("server_fqdn", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
.put("server_domain", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
.put("application", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
.build()
),
InputBindings.forInputSuppliers(
new ImmutableMap.Builder<String, InputBindings.InputSupplier>()
.put("device_id", InputBindings.inputSupplier(ExpressionType.STRING, () -> "1"))
.put("rule_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 271L))
.put("template_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 271L))
.put("chart_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 271L))
.put("version", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1L))
.put("client_ip_object", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null))
.put("server_ip_object", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null))
.put("fqdn_category", InputBindings.inputSupplier(ExpressionType.STRING_ARRAY, () -> null))
.put("client_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
.put("server_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> "5.245.228.51"))
.put("server_fqdn", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
.put("server_domain", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
.put("application", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
.build()
),
// ...
InputBindings.forInputSuppliers(
new ImmutableMap.Builder<String, InputBindings.InputSupplier>()
.put("device_id", InputBindings.inputSupplier(ExpressionType.STRING, () -> "1"))
.put("rule_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 81))
.put("template_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 81))
.put("chart_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 81))
.put("version", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1))
.put("client_ip_object", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
.put("server_ip_object", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
.put("fqdn_category", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
.put("client_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
.put("server_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
.put("server_fqdn", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
.put("server_domain", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
.put("application", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
.build()
),
InputBindings.forInputSuppliers(
new ImmutableMap.Builder<String, InputBindings.InputSupplier>()
.put("device_id", InputBindings.inputSupplier(ExpressionType.STRING, () -> "1"))
.put("rule_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 101))
.put("template_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 101))
.put("chart_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 101))
.put("version", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1))
.put("client_ip_object", InputBindings.inputSupplier(ExpressionType.STRING, () -> "5,7,8"))
.put("server_ip_object", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
.put("fqdn_category", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
.put("client_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
.put("server_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
.put("server_fqdn", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
.put("server_domain", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
.put("application", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
.build()
),
InputBindings.forInputSuppliers(
new ImmutableMap.Builder<String, InputBindings.InputSupplier>()
.put("device_id", InputBindings.inputSupplier(ExpressionType.STRING, () -> "1"))
.put("rule_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 271L))
.put("template_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 271L))
.put("chart_id", InputBindings.inputSupplier(ExpressionType.LONG, () -> 271L))
.put("version", InputBindings.inputSupplier(ExpressionType.LONG, () -> 1L))
.put("client_ip_object", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
.put("server_ip_object", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
.put("fqdn_category", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
.put("client_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
.put("server_ip", InputBindings.inputSupplier(ExpressionType.STRING, () -> "5.245.228.51"))
.put("server_fqdn", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
.put("server_domain", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
.put("application", InputBindings.inputSupplier(ExpressionType.STRING, () -> null))
.build()
),
};
@Test
public void test() throws Exception{
Expr expr = Parser.parse("current_timestamp_millis()", exprMacroTable);
System.out.println(expr.analyzeInputs().getRequiredBindings());
ExprEval eval = expr.eval(inputBindings);
System.out.println(eval.value());
Thread.sleep(1000);
eval = expr.eval(inputBindings);
System.out.println(eval.value());
Thread.sleep(1000);
expr = Parser.parse("current_timestamp_millis()", exprMacroTable);
eval = expr.eval(inputBindings);
System.out.println(eval.value());
}
}