From c6ca990f1fd87d1c9e4e215a44642f9dcc9893d6 Mon Sep 17 00:00:00 2001 From: Vishesh Garg Date: Wed, 11 Oct 2023 19:48:36 +0530 Subject: [PATCH] Rewrite EARLIEST/LATEST query operators to EARLIEST_BY/LATEST_BY (#15095) EARLIEST and LATEST operators implicitly reference the __time column for calculation of the aggregate value. Since the reference isn't explicit, Calcite sometimes fails to update the __time column name when there's column renaming --such as in the case of nested queries -- resulting in column not found errors. This change rewrites these operators to EARLIEST_BY and LATEST_BY during query processing to make the reference explicit to Calcite. --- .../EarliestLatestAnySqlAggregator.java | 100 ++++++++++++++++-- .../druid/sql/calcite/CalciteQueryTest.java | 29 +++++ 2 files changed, 123 insertions(+), 6 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java index abaeede99484..2e0316160277 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java @@ -23,15 +23,23 @@ import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; +import org.apache.calcite.runtime.CalciteContextException; import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.sql.SqlCall; import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlOperatorBinding; +import org.apache.calcite.sql.parser.SqlParserPos; import org.apache.calcite.sql.type.InferTypes; import org.apache.calcite.sql.type.OperandTypes; import org.apache.calcite.sql.type.SqlReturnTypeInference; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.sql.type.SqlTypeUtil; +import org.apache.calcite.sql.util.SqlVisitor; +import org.apache.calcite.sql.validate.SqlValidator; +import org.apache.calcite.sql.validate.SqlValidatorException; import org.apache.calcite.util.Optionality; import org.apache.druid.error.DruidException; import org.apache.druid.error.InvalidSqlInput; @@ -61,14 +69,21 @@ import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry; import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.Collections; import java.util.List; public class EarliestLatestAnySqlAggregator implements SqlAggregator { - public static final SqlAggregator EARLIEST = new EarliestLatestAnySqlAggregator(AggregatorType.EARLIEST); - public static final SqlAggregator LATEST = new EarliestLatestAnySqlAggregator(AggregatorType.LATEST); - public static final SqlAggregator ANY_VALUE = new EarliestLatestAnySqlAggregator(AggregatorType.ANY_VALUE); + public static final SqlAggregator EARLIEST = new EarliestLatestAnySqlAggregator( + AggregatorType.EARLIEST, + EarliestLatestBySqlAggregator.EARLIEST_BY.calciteFunction() + ); + public static final SqlAggregator LATEST = new EarliestLatestAnySqlAggregator( + AggregatorType.LATEST, + EarliestLatestBySqlAggregator.LATEST_BY.calciteFunction() + ); + public static final SqlAggregator ANY_VALUE = new EarliestLatestAnySqlAggregator(AggregatorType.ANY_VALUE, null); enum AggregatorType { @@ -161,10 +176,10 @@ abstract AggregatorFactory createAggregatorFactory( private final AggregatorType aggregatorType; private final SqlAggFunction function; - private EarliestLatestAnySqlAggregator(final AggregatorType aggregatorType) + private EarliestLatestAnySqlAggregator(final AggregatorType aggregatorType, final SqlAggFunction replacementAggFunc) { this.aggregatorType = aggregatorType; - this.function = new EarliestLatestSqlAggFunction(aggregatorType); + this.function = new EarliestLatestSqlAggFunction(aggregatorType, replacementAggFunc); } @Override @@ -305,12 +320,48 @@ public RelDataType inferReturnType(SqlOperatorBinding sqlOperatorBinding) } } + private static class TimeColIdentifer extends SqlIdentifier + { + + public TimeColIdentifer() + { + super("__time", SqlParserPos.ZERO); + } + + @Override + public R accept(SqlVisitor visitor) + { + + try { + return super.accept(visitor); + } + catch (CalciteContextException e) { + if (e.getCause() instanceof SqlValidatorException) { + throw DruidException.forPersona(DruidException.Persona.ADMIN) + .ofCategory(DruidException.Category.INVALID_INPUT) + .build( + e, + "Query could not be planned. A possible reason is [%s]", + "LATEST and EARLIEST aggregators implicitly depend on the __time column, but the " + + "table queried doesn't contain a __time column. Please use LATEST_BY or EARLIEST_BY " + + "and specify the column explicitly." + ); + + } else { + throw e; + } + } + } + } + private static class EarliestLatestSqlAggFunction extends SqlAggFunction { private static final EarliestLatestReturnTypeInference EARLIEST_LATEST_ARG0_RETURN_TYPE_INFERENCE = new EarliestLatestReturnTypeInference(0); - EarliestLatestSqlAggFunction(AggregatorType aggregatorType) + private final SqlAggFunction replacementAggFunc; + + EarliestLatestSqlAggFunction(AggregatorType aggregatorType, SqlAggFunction replacementAggFunc) { super( aggregatorType.name(), @@ -331,6 +382,43 @@ private static class EarliestLatestSqlAggFunction extends SqlAggFunction false, Optionality.FORBIDDEN ); + this.replacementAggFunc = replacementAggFunc; + } + + @Override + public SqlNode rewriteCall( + SqlValidator validator, + SqlCall call + ) + { + // Rewrite EARLIEST/LATEST to EARLIEST_BY/LATEST_BY to make + // reference to __time column explicit so that Calcite tracks it + + if (replacementAggFunc == null) { + return call; + } + + List operands = call.getOperandList(); + + SqlParserPos pos = call.getParserPosition(); + + if (operands.isEmpty() || operands.size() > 2) { + throw InvalidSqlInput.exception( + "Function [%s] expects 1 or 2 arguments but found [%s]", + getName(), + operands.size() + ); + } + + List newOperands = new ArrayList<>(); + newOperands.add(operands.get(0)); + newOperands.add(new TimeColIdentifer()); + + if (operands.size() == 2) { + newOperands.add(operands.get(1)); + } + + return replacementAggFunc.createCall(pos, newOperands); } } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 486a47ecd484..8659c1c29eff 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -676,6 +676,35 @@ public void testEarliestAggregators() ); } + @Test + public void testLatestToLatestByConversion() + { + msqIncompatible(); + testQuery( + "SELECT LATEST(dim1,10) FROM (SELECT DISTINCT __time, dim1 from foo)", + ImmutableList.of( + new GroupByQuery.Builder() + .setDataSource( + GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions(dimensions( + new DefaultDimensionSpec("__time", "d0", ColumnType.LONG), + new DefaultDimensionSpec("dim1", "d1", ColumnType.STRING) + )) + .setContext(QUERY_CONTEXT_DEFAULT) + .build()) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setAggregatorSpecs( + new StringLastAggregatorFactory("a0", "d1", "d0", 10)) + .setContext(QUERY_CONTEXT_DEFAULT) + .build()), + ImmutableList.of(new Object[]{"abc"}) + ); + } + @Test public void testLatestVectorAggregators() {