Skip to content

Commit

Permalink
Rewrite EARLIEST/LATEST query operators to EARLIEST_BY/LATEST_BY (apa…
Browse files Browse the repository at this point in the history
…che#15095)

EARLIEST and LATEST operators implicitly reference the __time column for calculation of the aggregate value. Since the reference isn't explicit, Calcite sometimes fails to update the __time column name when there's column renaming --such as in the case of nested queries -- resulting in column not found errors.

This change rewrites these operators to EARLIEST_BY and LATEST_BY during query processing to make the reference explicit to Calcite.
  • Loading branch information
gargvishesh authored Oct 11, 2023
1 parent 52d94b0 commit c6ca990
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,23 @@
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.runtime.CalciteContextException;
import org.apache.calcite.sql.SqlAggFunction;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperatorBinding;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.type.InferTypes;
import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.type.SqlReturnTypeInference;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.type.SqlTypeUtil;
import org.apache.calcite.sql.util.SqlVisitor;
import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.calcite.sql.validate.SqlValidatorException;
import org.apache.calcite.util.Optionality;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidSqlInput;
Expand Down Expand Up @@ -61,14 +69,21 @@
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class EarliestLatestAnySqlAggregator implements SqlAggregator
{
public static final SqlAggregator EARLIEST = new EarliestLatestAnySqlAggregator(AggregatorType.EARLIEST);
public static final SqlAggregator LATEST = new EarliestLatestAnySqlAggregator(AggregatorType.LATEST);
public static final SqlAggregator ANY_VALUE = new EarliestLatestAnySqlAggregator(AggregatorType.ANY_VALUE);
public static final SqlAggregator EARLIEST = new EarliestLatestAnySqlAggregator(
AggregatorType.EARLIEST,
EarliestLatestBySqlAggregator.EARLIEST_BY.calciteFunction()
);
public static final SqlAggregator LATEST = new EarliestLatestAnySqlAggregator(
AggregatorType.LATEST,
EarliestLatestBySqlAggregator.LATEST_BY.calciteFunction()
);
public static final SqlAggregator ANY_VALUE = new EarliestLatestAnySqlAggregator(AggregatorType.ANY_VALUE, null);

enum AggregatorType
{
Expand Down Expand Up @@ -161,10 +176,10 @@ abstract AggregatorFactory createAggregatorFactory(
private final AggregatorType aggregatorType;
private final SqlAggFunction function;

private EarliestLatestAnySqlAggregator(final AggregatorType aggregatorType)
private EarliestLatestAnySqlAggregator(final AggregatorType aggregatorType, final SqlAggFunction replacementAggFunc)
{
this.aggregatorType = aggregatorType;
this.function = new EarliestLatestSqlAggFunction(aggregatorType);
this.function = new EarliestLatestSqlAggFunction(aggregatorType, replacementAggFunc);
}

@Override
Expand Down Expand Up @@ -305,12 +320,48 @@ public RelDataType inferReturnType(SqlOperatorBinding sqlOperatorBinding)
}
}

private static class TimeColIdentifer extends SqlIdentifier
{

public TimeColIdentifer()
{
super("__time", SqlParserPos.ZERO);
}

@Override
public <R> R accept(SqlVisitor<R> visitor)
{

try {
return super.accept(visitor);
}
catch (CalciteContextException e) {
if (e.getCause() instanceof SqlValidatorException) {
throw DruidException.forPersona(DruidException.Persona.ADMIN)
.ofCategory(DruidException.Category.INVALID_INPUT)
.build(
e,
"Query could not be planned. A possible reason is [%s]",
"LATEST and EARLIEST aggregators implicitly depend on the __time column, but the "
+ "table queried doesn't contain a __time column. Please use LATEST_BY or EARLIEST_BY "
+ "and specify the column explicitly."
);

} else {
throw e;
}
}
}
}

private static class EarliestLatestSqlAggFunction extends SqlAggFunction
{
private static final EarliestLatestReturnTypeInference EARLIEST_LATEST_ARG0_RETURN_TYPE_INFERENCE =
new EarliestLatestReturnTypeInference(0);

EarliestLatestSqlAggFunction(AggregatorType aggregatorType)
private final SqlAggFunction replacementAggFunc;

EarliestLatestSqlAggFunction(AggregatorType aggregatorType, SqlAggFunction replacementAggFunc)
{
super(
aggregatorType.name(),
Expand All @@ -331,6 +382,43 @@ private static class EarliestLatestSqlAggFunction extends SqlAggFunction
false,
Optionality.FORBIDDEN
);
this.replacementAggFunc = replacementAggFunc;
}

@Override
public SqlNode rewriteCall(
SqlValidator validator,
SqlCall call
)
{
// Rewrite EARLIEST/LATEST to EARLIEST_BY/LATEST_BY to make
// reference to __time column explicit so that Calcite tracks it

if (replacementAggFunc == null) {
return call;
}

List<SqlNode> operands = call.getOperandList();

SqlParserPos pos = call.getParserPosition();

if (operands.isEmpty() || operands.size() > 2) {
throw InvalidSqlInput.exception(
"Function [%s] expects 1 or 2 arguments but found [%s]",
getName(),
operands.size()
);
}

List<SqlNode> newOperands = new ArrayList<>();
newOperands.add(operands.get(0));
newOperands.add(new TimeColIdentifer());

if (operands.size() == 2) {
newOperands.add(operands.get(1));
}

return replacementAggFunc.createCall(pos, newOperands);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,35 @@ public void testEarliestAggregators()
);
}

@Test
public void testLatestToLatestByConversion()
{
msqIncompatible();
testQuery(
"SELECT LATEST(dim1,10) FROM (SELECT DISTINCT __time, dim1 from foo)",
ImmutableList.of(
new GroupByQuery.Builder()
.setDataSource(
GroupByQuery.builder()
.setDataSource(CalciteTests.DATASOURCE1)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimensions(dimensions(
new DefaultDimensionSpec("__time", "d0", ColumnType.LONG),
new DefaultDimensionSpec("dim1", "d1", ColumnType.STRING)
))
.setContext(QUERY_CONTEXT_DEFAULT)
.build())
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setAggregatorSpecs(
new StringLastAggregatorFactory("a0", "d1", "d0", 10))
.setContext(QUERY_CONTEXT_DEFAULT)
.build()),
ImmutableList.of(new Object[]{"abc"})
);
}

@Test
public void testLatestVectorAggregators()
{
Expand Down

0 comments on commit c6ca990

Please sign in to comment.