Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[multistage] Initial (phase 1) Query runtime for window functions with ORDER BY within the OVER() clause #10449

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2165,6 +2165,24 @@
"sql": "EXPLAIN PLAN FOR SELECT MIN(a.col3) OVER(ORDER BY a.col3 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) FROM a",
"expectedException": "Error explain query plan for.*"
},
{
somandal marked this conversation as resolved.
Show resolved Hide resolved
"description": "unsupported custom frames - ORDER BY with two columns and RANGE",
"notes": "Apache Calcite throws error: RANGE clause cannot be used with compound ORDER BY clause, even though not specifying the frame results in RANGE itself",
"sql": "EXPLAIN PLAN FOR SELECT MIN(a.col3) OVER(ORDER BY a.col3, a.col1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) FROM a",
"expectedException": "Error explain query plan for.*"
},
{
"description": "unsupported custom frames - PARTITION BY and ORDER BY with two columns and RANGE",
"notes": "Apache Calcite throws error: RANGE clause cannot be used with compound ORDER BY clause, even though not specifying the frame results in RANGE itself",
"sql": "EXPLAIN PLAN FOR SELECT MIN(a.col3) OVER(PARTITION BY a.col2 ORDER BY a.col3, a.col1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) FROM a",
"expectedException": "Error explain query plan for.*"
},
{
"description": "unsupported custom frames - ORDER BY with two columns and ROWS",
"notes": "not yet supported",
"sql": "EXPLAIN PLAN FOR SELECT MIN(a.col3) OVER(ORDER BY a.col3, a.col1 ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) FROM a",
"expectedException": "Error explain query plan for.*"
},
{
"description": "Multiple window groups",
"notes": "not yet supported",
Expand All @@ -2189,6 +2207,12 @@
"sql": "EXPLAIN PLAN FOR SELECT SUM(a.col3) OVER(ORDER BY a.col2), MIN(a.col3) OVER(PARTITION BY a.col2) FROM a",
"expectedException": "Error explain query plan for.*"
},
{
"description": "Multiple window groups",
"notes": "not yet supported",
"sql": "EXPLAIN PLAN FOR SELECT SUM(a.col3) OVER(ORDER BY a.col2, a.col1), MIN(a.col3) OVER(ORDER BY a.col1, a.col2) FROM a",
"expectedException": "Error explain query plan for.*"
},
{
"description": "Using aggregation inside ORDER BY within OVER",
"sql": "EXPLAIN PLAN FOR SELECT SUM(a.col3) OVER(ORDER BY MAX(a.col3)) FROM a",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -57,16 +58,20 @@
* Unlike the AggregateOperator which will output one row per group, the WindowAggregateOperator
* will output as many rows as input rows.
*
* For queries using an 'ORDER BY' clause within the 'OVER()', this WindowAggregateOperator expects that the incoming
* keys are already ordered based on the 'ORDER BY' keys. No ordering is performed in this operator. The planner
* should handle adding a 'SortExchange' to do the ordering prior to pipelining the data to the upstream operators
* wherever ordering is required.
*
* Note: This class performs aggregation over the double value of input.
* If the input is single value, the output type will be input type. Otherwise, the output type will be double.
*
* TODO:
* 1. Add support for OVER() clause with ORDER BY only or PARTITION BY ORDER BY
* 2. Add support for rank window functions
* 3. Add support for value window functions
* 4. Add support for custom frames
* 5. Add support for null direction handling (even for PARTITION BY only queries with custom null direction)
* 6. Add support for multiple window groups (each WindowAggregateOperator should still work on a single group)
* 1. Add support for rank window functions
* 2. Add support for value window functions
* 3. Add support for custom frames (including ROWS support)
* 4. Add support for null direction handling (even for PARTITION BY only queries with custom null direction)
* 5. Add support for multiple window groups (each WindowAggregateOperator should still work on a single group)
*/
public class WindowAggregateOperator extends MultiStageOperator {
private static final String EXPLAIN_NAME = "WINDOW";
Expand All @@ -79,8 +84,9 @@ public class WindowAggregateOperator extends MultiStageOperator {
private final List<RexExpression.FunctionCall> _aggCalls;
private final List<RexExpression> _constants;
private final DataSchema _resultSchema;
private final AggregationUtils.Accumulator[] _windowAccumulators;
private final WindowAggregateAccumulator[] _windowAccumulators;
private final Map<Key, List<Object[]>> _partitionRows;
private final boolean _isPartitionByOnly;

private TransferableBlock _upstreamErrorBlock;

Expand All @@ -94,7 +100,7 @@ public WindowAggregateOperator(OpChainExecutionContext context, MultiStageOperat
int upperBound, WindowNode.WindowFrameType windowFrameType, List<RexExpression> constants,
DataSchema resultSchema, DataSchema inputSchema) {
this(context, inputOperator, groupSet, orderSet, orderSetDirection, orderSetNullDirection, aggCalls, lowerBound,
upperBound, windowFrameType, constants, resultSchema, inputSchema, AggregationUtils.Accumulator.MERGERS);
upperBound, windowFrameType, constants, resultSchema, inputSchema, WindowAggregateAccumulator.WIN_AGG_MERGERS);
}

@VisibleForTesting
Expand All @@ -106,37 +112,33 @@ public WindowAggregateOperator(OpChainExecutionContext context, MultiStageOperat
Map<String, Function<DataSchema.ColumnDataType, AggregationUtils.Merger>> mergers) {
super(context);

boolean isPartitionByOnly = isPartitionByOnlyQuery(groupSet, orderSet, orderSetDirection, orderSetNullDirection);
Preconditions.checkState(orderSet == null || orderSet.isEmpty() || isPartitionByOnly,
"Order by is not yet supported in window functions");

_inputOperator = inputOperator;
_groupSet = groupSet;
_orderSetInfo = new OrderSetInfo(orderSet, orderSetDirection, orderSetNullDirection);
_isPartitionByOnly = isPartitionByOnlyQuery(groupSet, orderSet);
_orderSetInfo = new OrderSetInfo(orderSet, orderSetDirection, orderSetNullDirection, _isPartitionByOnly);
_windowFrame = new WindowFrame(lowerBound, upperBound, windowFrameType);

Preconditions.checkState(_windowFrame.getWindowFrameType() == WindowNode.WindowFrameType.RANGE,
"Only RANGE type frames are supported at present");
Preconditions.checkState(_windowFrame.isUnboundedPreceding(),
"Only default frame is supported, lowerBound must be UNBOUNDED PRECEDING");
Preconditions.checkState(_windowFrame.isUnboundedFollowing()
|| (_windowFrame.isUpperBoundCurrentRow() && isPartitionByOnly),
Preconditions.checkState(_windowFrame.isUnboundedFollowing() || _windowFrame.isUpperBoundCurrentRow(),
"Only default frame is supported, upperBound must be UNBOUNDED FOLLOWING or CURRENT ROW");

// we expect all agg calls to be aggregate function calls
_aggCalls = aggCalls.stream().map(RexExpression.FunctionCall.class::cast).collect(Collectors.toList());
_constants = constants;
_resultSchema = resultSchema;

_windowAccumulators = new AggregationUtils.Accumulator[_aggCalls.size()];
_windowAccumulators = new WindowAggregateAccumulator[_aggCalls.size()];
int aggCallsSize = _aggCalls.size();
for (int i = 0; i < aggCallsSize; i++) {
RexExpression.FunctionCall agg = _aggCalls.get(i);
String functionName = agg.getFunctionName();
if (!mergers.containsKey(functionName)) {
throw new IllegalStateException("Unexpected aggregation function name: " + functionName);
}
_windowAccumulators[i] = new AggregationUtils.Accumulator(agg, mergers, functionName, inputSchema);
_windowAccumulators[i] = new WindowAggregateAccumulator(agg, mergers, functionName, inputSchema, _orderSetInfo);
}

_partitionRows = new HashMap<>();
Expand Down Expand Up @@ -180,9 +182,7 @@ protected TransferableBlock getNextBlock() {
}
}

private boolean isPartitionByOnlyQuery(List<RexExpression> groupSet, List<RexExpression> orderSet,
List<RelFieldCollation.Direction> orderSetDirection,
List<RelFieldCollation.NullDirection> orderSetNullDirection) {
private boolean isPartitionByOnlyQuery(List<RexExpression> groupSet, List<RexExpression> orderSet) {
if (CollectionUtils.isEmpty(orderSet)) {
return true;
}
Expand All @@ -203,15 +203,18 @@ private boolean isPartitionByOnlyQuery(List<RexExpression> groupSet, List<RexExp
}

private TransferableBlock produceWindowAggregatedBlock() {
Key emptyOrderKey = AggregationUtils.extractEmptyKey();
List<Object[]> rows = new ArrayList<>(_numRows);
for (Map.Entry<Key, List<Object[]>> e : _partitionRows.entrySet()) {
Key partitionKey = e.getKey();
List<Object[]> rowList = e.getValue();
for (Object[] existingRow : rowList) {
Object[] row = new Object[existingRow.length + _aggCalls.size()];
Key orderKey = _isPartitionByOnly ? emptyOrderKey
: AggregationUtils.extractRowKey(existingRow, _orderSetInfo.getOrderSet());
System.arraycopy(existingRow, 0, row, 0, existingRow.length);
for (int i = 0; i < _windowAccumulators.length; i++) {
row[i + existingRow.length] = _windowAccumulators[i].getResults().get(partitionKey);
row[i + existingRow.length] = _windowAccumulators[i].getResultForKeys(partitionKey, orderKey);
}
rows.add(row);
}
Expand All @@ -228,6 +231,7 @@ private TransferableBlock produceWindowAggregatedBlock() {
* @return whether or not the operator is ready to move on (EOS or ERROR)
*/
private boolean consumeInputBlocks() {
Key emptyOrderKey = AggregationUtils.extractEmptyKey();
TransferableBlock block = _inputOperator.nextBlock();
while (!block.isNoOpBlock()) {
// setting upstream error block
Expand All @@ -242,13 +246,14 @@ private boolean consumeInputBlocks() {
List<Object[]> container = block.getContainer();
for (Object[] row : container) {
_numRows++;
// TODO: Revisit the aggregation logic once ORDER BY inside OVER() support is added, also revisit null direction
// handling for all query types
// TODO: Revisit null direction handling for all query types
Key key = AggregationUtils.extractRowKey(row, _groupSet);
Key orderKey = _isPartitionByOnly ? emptyOrderKey
: AggregationUtils.extractRowKey(row, _orderSetInfo.getOrderSet());
_partitionRows.computeIfAbsent(key, k -> new ArrayList<>()).add(row);
int aggCallsSize = _aggCalls.size();
for (int i = 0; i < aggCallsSize; i++) {
_windowAccumulators[i].accumulate(key, row);
_windowAccumulators[i].accumulate(key, orderKey, row);
}
}
block = _inputOperator.nextBlock();
Expand All @@ -266,12 +271,15 @@ private static class OrderSetInfo {
final List<RelFieldCollation.Direction> _orderSetDirection;
// List of null direction for each key
final List<RelFieldCollation.NullDirection> _orderSetNullDirection;
// Set to 'true' if this is a partition by only query
final boolean _isPartitionByOnly;

OrderSetInfo(List<RexExpression> orderSet, List<RelFieldCollation.Direction> orderSetDirection,
List<RelFieldCollation.NullDirection> orderSetNullDirection) {
List<RelFieldCollation.NullDirection> orderSetNullDirection, boolean isPartitionByOnly) {
_orderSet = orderSet;
_orderSetDirection = orderSetDirection;
_orderSetNullDirection = orderSetNullDirection;
_isPartitionByOnly = isPartitionByOnly;
}

List<RexExpression> getOrderSet() {
Expand All @@ -285,6 +293,10 @@ List<RelFieldCollation.Direction> getOrderSetDirection() {
List<RelFieldCollation.NullDirection> getOrderSetNullDirection() {
return _orderSetNullDirection;
}

boolean isPartitionByOnly() {
return _isPartitionByOnly;
}
}

/**
Expand Down Expand Up @@ -329,4 +341,88 @@ int getUpperBound() {
return _upperBound;
}
}

private static class WindowAggregateAccumulator extends AggregationUtils.Accumulator {
private static final Map<String, Function<DataSchema.ColumnDataType, AggregationUtils.Merger>> WIN_AGG_MERGERS =
ImmutableMap.<String, Function<DataSchema.ColumnDataType, AggregationUtils.Merger>>builder()
.putAll(AggregationUtils.Accumulator.MERGERS)
.build();

private final Map<Key, OrderKeyResult> _orderByResults = new HashMap<>();
private final boolean _isPartitionByOnly;
private final Key _emptyOrderKey;

WindowAggregateAccumulator(RexExpression.FunctionCall aggCall, Map<String,
Function<DataSchema.ColumnDataType, AggregationUtils.Merger>> merger, String functionName,
DataSchema inputSchema, OrderSetInfo orderSetInfo) {
super(aggCall, merger, functionName, inputSchema);
_isPartitionByOnly = CollectionUtils.isEmpty(orderSetInfo.getOrderSet()) || orderSetInfo.isPartitionByOnly();
_emptyOrderKey = AggregationUtils.extractEmptyKey();
}

public void accumulate(Key key, Key orderKey, Object[] row) {
if (_isPartitionByOnly) {
accumulate(key, row);
return;
}

// TODO: fix that single agg result (original type) has different type from multiple agg results (double).
Key previousOrderKeyIfPresent = _orderByResults.get(key) == null ? null
: _orderByResults.get(key).getPreviousOrderByKey();
Object currentRes = previousOrderKeyIfPresent == null ? null
: _orderByResults.get(key).getOrderByResults().get(previousOrderKeyIfPresent);
Object value = _inputRef == -1 ? _literal : row[_inputRef];

_orderByResults.putIfAbsent(key, new OrderKeyResult());
if (currentRes == null) {
_orderByResults.get(key).addOrderByResult(orderKey, _merger.initialize(value, _dataType));
} else {
Object mergedResult;
if (orderKey.equals(previousOrderKeyIfPresent)) {
mergedResult = _merger.merge(currentRes, value);
} else {
Object previousValue = _orderByResults.get(key).getOrderByResults().get(previousOrderKeyIfPresent);
mergedResult = _merger.merge(previousValue, value);
}
_orderByResults.get(key).addOrderByResult(orderKey, mergedResult);
}
}

public Object getResultForKeys(Key key, Key orderKey) {
if (_isPartitionByOnly) {
return _results.get(key);
} else {
return _orderByResults.get(key).getOrderByResults().get(orderKey);
}
}

public Map<Key, OrderKeyResult> getOrderByResults() {
return _orderByResults;
}

static class OrderKeyResult {
final Map<Key, Object> _orderByResults;
Key _previousOrderByKey;

OrderKeyResult() {
_orderByResults = new HashMap<>();
_previousOrderByKey = null;
}

public void addOrderByResult(Key orderByKey, Object value) {
// We expect to get the rows in order based on the ORDER BY key so it is safe to blindly assign the
// current key as the previous key
_orderByResults.put(orderByKey, value);
_previousOrderByKey = orderByKey;
}

public Map<Key, Object> getOrderByResults() {
return _orderByResults;
}

public Key getPreviousOrderByKey() {
return _previousOrderByKey;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ public static Key extractRowKey(Object[] row, List<RexExpression> groupSet) {
return new Key(keyElements);
}

public static Key extractEmptyKey() {
return new Key(new Object[0]);
}

private static Object mergeSum(Object left, Object right) {
return ((Number) left).doubleValue() + ((Number) right).doubleValue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,17 +402,20 @@ public void testShouldThrowOnUnknownRankAggFunction() {
WindowNode.WindowFrameType.RANGE, Collections.emptyList(), outSchema, inSchema);
}

@Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = "Order by is not yet "
+ "supported in window functions")
public void testShouldThrowOnNonEmptyOrderByKeysNotMatchingPartitionByKeys() {
// TODO: Remove this test once order by support is added
@Test
public void testNonEmptyOrderByKeysNotMatchingPartitionByKeys() {
// Given:
List<RexExpression> calls = ImmutableList.of(getSum(new RexExpression.InputRef(1)));
List<RexExpression> calls = ImmutableList.of(getSum(new RexExpression.InputRef(0)));
List<RexExpression> group = ImmutableList.of(new RexExpression.InputRef(0));
List<RexExpression> order = ImmutableList.of(new RexExpression.InputRef(1));

DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new DataSchema.ColumnDataType[]{INT, STRING});
Mockito.when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, "foo"}))
// Input should be in sorted order on the order by key as SortExchange will handle pre-sorting the data
Mockito.when(_input.nextBlock())
.thenReturn(OperatorTestUtil.block(inSchema, new Object[]{3, "and"}, new Object[]{2, "bar"},
new Object[]{2, "foo"}))
.thenReturn(OperatorTestUtil.block(inSchema, new Object[]{1, "foo"}, new Object[]{2, "foo"},
new Object[]{3, "true"}))
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());

DataSchema outSchema =
Expand All @@ -422,10 +425,28 @@ public void testShouldThrowOnNonEmptyOrderByKeysNotMatchingPartitionByKeys() {
Arrays.asList(RelFieldCollation.Direction.ASCENDING), Arrays.asList(RelFieldCollation.NullDirection.LAST),
calls, Integer.MIN_VALUE, Integer.MAX_VALUE, WindowNode.WindowFrameType.RANGE, Collections.emptyList(),
outSchema, inSchema);

TransferableBlock result = operator.getNextBlock();
while (result.isNoOpBlock()) {
result = operator.getNextBlock();
}
TransferableBlock eosBlock = operator.getNextBlock();
List<Object[]> resultRows = result.getContainer();
List<Object[]> expectedRows = Arrays.asList(new Object[]{1, "foo", 1}, new Object[]{2, "bar", 2},
new Object[]{2, "foo", 6.0}, new Object[]{2, "foo", 6.0}, new Object[]{3, "and", 3},
new Object[]{3, "true", 6.0});
Assert.assertEquals(resultRows.size(), expectedRows.size());
Assert.assertEquals(resultRows.get(0), expectedRows.get(0));
Assert.assertEquals(resultRows.get(1), expectedRows.get(1));
Assert.assertEquals(resultRows.get(2), expectedRows.get(2));
Assert.assertEquals(resultRows.get(3), expectedRows.get(3));
Assert.assertEquals(resultRows.get(4), expectedRows.get(4));
Assert.assertEquals(resultRows.get(5), expectedRows.get(5));
Assert.assertTrue(eosBlock.isEndOfStreamBlock(), "Second block is EOS (done processing)");
}

@Test
public void testShouldThrowOnNonEmptyOrderByKeysMatchingPartitionByKeysWithDifferentDirection() {
public void testNonEmptyOrderByKeysMatchingPartitionByKeysWithDifferentDirection() {
// Given:
// Set ORDER BY key same as PARTITION BY key with custom direction and null direction. Should still be treated
// like a PARTITION BY only query (since the final aggregation value won't change).
Expand Down
Loading