Skip to content

Commit

Permalink
SQL: Fix ORDER BY on aggregates and GROUPed BY fields (#51894)
Browse files Browse the repository at this point in the history
Previously, in the in-memory sorting module
`LocalAggregationSorterListener` only the aggregate functions where used
(grabbed by the `sortingColumns`). As a consequence, if the ORDER BY
was also using columns of the GROUP BY clause, (especially in the case
of higher priority - before the aggregate functions) wrong results were
produced. E.g.:
```
SELECT gender, MAX(salary) AS max FROM test_emp
GROUP BY gender
ORDER BY gender, max
```

Add all columns of the ORDER BY to the `sortingColumns` so that the
`LocalAggregationSorterListener` can use the correct comparators in
the underlying PriorityQueue used to implement the in-memory sorting.

Fixes: #50355
(cherry picked from commit be680af)
  • Loading branch information
matriv committed Feb 12, 2020
1 parent 6075a77 commit 45ec73d
Show file tree
Hide file tree
Showing 8 changed files with 242 additions and 84 deletions.
48 changes: 47 additions & 1 deletion x-pack/plugin/sql/qa/src/main/resources/agg-ordering.csv-spec
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,50 @@ g:s | gender:s | s3:i | SUM(salary):i | s5:i
M |M |2671054|2671054 |2671054
F |F |1666196|1666196 |1666196
null |null |487605 |487605 |487605
;
;

histogramDateTimeWithCountAndOrder_1
schema::h:ts|c:l
SELECT HISTOGRAM(birth_date, INTERVAL 1 YEAR) AS h, COUNT(*) as c FROM test_emp GROUP BY h ORDER BY h DESC, c ASC;

h | c
------------------------+---------------
1965-01-01T00:00:00.000Z|1
1964-01-01T00:00:00.000Z|4
1963-01-01T00:00:00.000Z|7
1962-01-01T00:00:00.000Z|6
1961-01-01T00:00:00.000Z|8
1960-01-01T00:00:00.000Z|8
1959-01-01T00:00:00.000Z|9
1958-01-01T00:00:00.000Z|7
1957-01-01T00:00:00.000Z|4
1956-01-01T00:00:00.000Z|5
1955-01-01T00:00:00.000Z|4
1954-01-01T00:00:00.000Z|8
1953-01-01T00:00:00.000Z|11
1952-01-01T00:00:00.000Z|8
null |10
;

histogramDateTimeWithCountAndOrder_2
schema::h:ts|c:l
SELECT HISTOGRAM(birth_date, INTERVAL 1 YEAR) AS h, COUNT(*) as c FROM test_emp GROUP BY h ORDER BY c DESC, h ASC;

h | c
------------------------+---------------
1953-01-01T00:00:00.000Z|11
null |10
1959-01-01T00:00:00.000Z|9
1952-01-01T00:00:00.000Z|8
1954-01-01T00:00:00.000Z|8
1960-01-01T00:00:00.000Z|8
1961-01-01T00:00:00.000Z|8
1958-01-01T00:00:00.000Z|7
1963-01-01T00:00:00.000Z|7
1962-01-01T00:00:00.000Z|6
1956-01-01T00:00:00.000Z|5
1955-01-01T00:00:00.000Z|4
1957-01-01T00:00:00.000Z|4
1964-01-01T00:00:00.000Z|4
1965-01-01T00:00:00.000Z|1
;
27 changes: 24 additions & 3 deletions x-pack/plugin/sql/qa/src/main/resources/agg-ordering.sql-spec
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ aggNotSpecifiedInTheAggregateAndGroupWithHavingWithLimitAndDirection
SELECT gender, MIN(salary) AS min, COUNT(*) AS c FROM test_emp GROUP BY gender HAVING c > 1 ORDER BY MAX(salary) ASC, c DESC LIMIT 5;

groupAndAggNotSpecifiedInTheAggregateWithHaving
SELECT gender, MIN(salary) AS min, COUNT(*) AS c FROM test_emp GROUP BY gender HAVING c > 1 ORDER BY gender, MAX(salary);
SELECT gender, MIN(salary) AS min, COUNT(*) AS c FROM test_emp GROUP BY gender HAVING c > 1 ORDER BY gender NULLS FIRST, MAX(salary);

multipleAggsThatGetRewrittenWithAliasOnAMediumGroupBy
SELECT languages, MAX(salary) AS max, MIN(salary) AS min FROM test_emp GROUP BY languages ORDER BY max;
Expand Down Expand Up @@ -134,5 +134,26 @@ SELECT gender AS g, first_name AS f, last_name AS l FROM test_emp GROUP BY f, ge
multipleGroupingsAndOrderingByGroups_8
SELECT gender AS g, first_name, last_name FROM test_emp GROUP BY g, last_name, first_name ORDER BY gender ASC, first_name DESC, last_name ASC;

multipleGroupingsAndOrderingByGroupsWithFunctions
SELECT first_name f, last_name l, gender g, CONCAT(first_name, last_name) c FROM test_emp GROUP BY gender, l, f, c ORDER BY gender, c DESC, first_name, last_name ASC;
multipleGroupingsAndOrderingByGroupsAndAggs_1
SELECT gender, MIN(salary) AS min, COUNT(*) AS c, MAX(salary) AS max FROM test_emp GROUP BY gender HAVING c > 1 ORDER BY gender ASC NULLS FIRST, MAX(salary) DESC;

multipleGroupingsAndOrderingByGroupsAndAggs_2
SELECT gender, MIN(salary) AS min, COUNT(*) AS c, MAX(salary) AS max FROM test_emp GROUP BY gender HAVING c > 1 ORDER BY gender DESC NULLS LAST, MAX(salary) ASC;

multipleGroupingsAndOrderingByGroupsWithFunctions_1
SELECT first_name f, last_name l, gender g, CONCAT(first_name, last_name) c FROM test_emp GROUP BY gender, l, f, c ORDER BY gender NULLS FIRST, c DESC, first_name, last_name ASC;

multipleGroupingsAndOrderingByGroupsWithFunctions_2
SELECT first_name f, last_name l, gender g, CONCAT(first_name, last_name) c FROM test_emp GROUP BY gender, l, f, c ORDER BY c DESC, gender DESC NULLS LAST, first_name, last_name ASC;

multipleGroupingsAndOrderingByGroupsAndAggregatesWithFunctions_1
SELECT CONCAT('foo', gender) g, MAX(salary) AS max, MIN(salary) AS min FROM test_emp GROUP BY g ORDER BY 1 NULLS FIRST, 2, 3;

multipleGroupingsAndOrderingByGroupsAndAggregatesWithFunctions_2
SELECT CONCAT('foo', gender) g, MAX(salary) AS max, MIN(salary) AS min FROM test_emp GROUP BY g ORDER BY 1 DESC NULLS LAST, 2, 3;

multipleGroupingsAndOrderingByGroupsAndAggregatesWithFunctions_3
SELECT CONCAT('foo', gender) g, MAX(salary) AS max, MIN(salary) AS min FROM test_emp GROUP BY g ORDER BY 2, 1 NULLS FIRST, 3;

multipleGroupingsAndOrderingByGroupsAndAggregatesWithFunctions_4
SELECT CONCAT('foo', gender) g, MAX(salary) AS max, MIN(salary) AS min FROM test_emp GROUP BY g ORDER BY 3 DESC, 1 NULLS FIRST, 2;
Original file line number Diff line number Diff line change
Expand Up @@ -578,36 +578,51 @@ static class AggSortingQueue extends PriorityQueue<Tuple<List<?>, Integer>> {
this.sortingColumns = sortingColumns;
}

// compare row based on the received attribute sort
// if a sort item is not in the list, it is assumed the sorting happened in ES
// and the results are left as is (by using the row ordering), otherwise it is sorted based on the given criteria.
//
// Take for example ORDER BY a, x, b, y
// a, b - are sorted in ES
// x, y - need to be sorted client-side
// sorting on x kicks in, only if the values for a are equal.

/**
* Compare row based on the received attribute sort
* <ul>
* <li>
* If a tuple in {@code sortingColumns} has a null comparator, it is assumed the sorting
* happened in ES and the results are left as is (by using the row ordering), otherwise it is
* sorted based on the given criteria.
* </li>
* <li>
* If no tuple exists in {@code sortingColumns} for an output column, it means this column
* is not included at all in the ORDER BY
* </li>
*</ul>
*
* Take for example ORDER BY a, x, b, y
* a, b - are sorted in ES
* x, y - need to be sorted client-side
* sorting on x kicks in only if the values for a are equal.
* sorting on y kicks in only if the values for a, x and b are all equal
*
*/
// thanks to @jpountz for the row ordering idea as a way to preserve ordering
@SuppressWarnings("unchecked")
@Override
protected boolean lessThan(Tuple<List<?>, Integer> l, Tuple<List<?>, Integer> r) {
for (Tuple<Integer, Comparator> tuple : sortingColumns) {
int i = tuple.v1().intValue();
int columnIdx = tuple.v1().intValue();
Comparator comparator = tuple.v2();

Object vl = l.v1().get(i);
Object vr = r.v1().get(i);
// Get the values for left and right rows at the current column index
Object vl = l.v1().get(columnIdx);
Object vr = r.v1().get(columnIdx);
if (comparator != null) {
int result = comparator.compare(vl, vr);
// if things are equals, move to the next comparator
// if things are not equal: return the comparison result,
// otherwise: move to the next comparator to solve the tie.
if (result != 0) {
return result > 0;
}
}
// no comparator means the existing order needs to be preserved
// no comparator means the rows are pre-ordered by ES for the column at
// the current index and the existing order needs to be preserved
else {
// check the values - if they are equal move to the next comparator
// otherwise return the row order
// check the values - if they are not equal return the row order
// otherwise: move to the next comparator to solve the tie.
if (Objects.equals(vl, vr) == false) {
return l.v2().compareTo(r.v2()) > 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private static void sorting(QueryContainer container, SearchSourceBuilder source
source.sort("_doc");
return;
}
for (Sort sortable : container.sort()) {
for (Sort sortable : container.sort().values()) {
SortBuilder<?> sortBuilder = null;

if (sortable instanceof AttributeSort) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.xpack.sql.expression.function.Functions;
import org.elasticsearch.xpack.sql.expression.function.ScoreAttribute;
import org.elasticsearch.xpack.sql.expression.function.aggregate.AggregateFunction;
import org.elasticsearch.xpack.sql.expression.function.aggregate.AggregateFunctionAttribute;
import org.elasticsearch.xpack.sql.expression.function.aggregate.CompoundNumericAggregate;
import org.elasticsearch.xpack.sql.expression.function.aggregate.Count;
import org.elasticsearch.xpack.sql.expression.function.aggregate.InnerAggregate;
Expand Down Expand Up @@ -461,20 +462,27 @@ protected PhysicalPlan rule(OrderExec plan) {
GroupByKey group = qContainer.findGroupForAgg(attr);

// TODO: might need to validate whether the target field or group actually exist
if (group != null && group != Aggs.IMPLICIT_GROUP_KEY) {
if (group!=null && group!=Aggs.IMPLICIT_GROUP_KEY) {
qContainer = qContainer.updateGroup(group.with(direction));
}
else {
// scalar functions typically require script ordering
if (attr instanceof ScalarFunctionAttribute) {
// nope, use scripted sorting
qContainer = qContainer.addSort(
new ScriptSort(((ScalarFunctionAttribute) attr).script(), direction, missing));
} else if (attr instanceof ScoreAttribute) {
qContainer = qContainer.addSort(new ScoreSort(direction, missing));
} else {
qContainer = qContainer.addSort(new AttributeSort(attr, direction, missing));
}

// scalar functions typically require script ordering
if (attr instanceof ScalarFunctionAttribute) {
ScalarFunctionAttribute sf = (ScalarFunctionAttribute) attr;
// nope, use scripted sorting
qContainer = qContainer.addSort(sf.id(), new ScriptSort(sf.script(), direction, missing));
}
// score
else if (attr instanceof ScoreAttribute) {
qContainer = qContainer.addSort(attr.id(), new ScoreSort(direction, missing));
}
// agg function
else if (attr instanceof AggregateFunctionAttribute) {
AggregateFunctionAttribute afa = (AggregateFunctionAttribute) attr;
qContainer = qContainer.addSort(afa.innerId(), new AttributeSort(attr, direction, missing));
// field, histogram
} else {
qContainer = qContainer.addSort(attr.id(), new AttributeSort(attr, direction, missing));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,12 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.xpack.sql.util.CollectionUtils.combine;

Expand Down Expand Up @@ -79,7 +76,7 @@ public class QueryContainer {
// at scrolling, their inputs (leaves) get updated
private final AttributeMap<Pipe> scalarFunctions;

private final Set<Sort> sort;
private final Map<ExpressionId, Sort> sort;
private final int limit;

// computed
Expand All @@ -96,15 +93,15 @@ public QueryContainer(Query query,
Map<ExpressionId, Attribute> aliases,
Map<String, GroupByKey> pseudoFunctions,
AttributeMap<Pipe> scalarFunctions,
Set<Sort> sort,
Map<ExpressionId, Sort> sort,
int limit) {
this.query = query;
this.aggs = aggs == null ? Aggs.EMPTY : aggs;
this.fields = fields == null || fields.isEmpty() ? emptyList() : fields;
this.aliases = aliases == null || aliases.isEmpty() ? Collections.emptyMap() : aliases;
this.pseudoFunctions = pseudoFunctions == null || pseudoFunctions.isEmpty() ? emptyMap() : pseudoFunctions;
this.scalarFunctions = scalarFunctions == null || scalarFunctions.isEmpty() ? AttributeMap.emptyAttributeMap() : scalarFunctions;
this.sort = sort == null || sort.isEmpty() ? emptySet() : sort;
this.sort = sort == null || sort.isEmpty() ? emptyMap() : sort;
this.limit = limit;
}

Expand All @@ -118,46 +115,52 @@ public List<Tuple<Integer, Comparator>> sortingColumns() {
return emptyList();
}

for (Sort s : sort.values()) {
if (isAggregateSort(s)) {
customSort = Boolean.TRUE;
break;
}
}

// If no custom sort is used break early
if (customSort == null) {
customSort = Boolean.FALSE;
return emptyList();
}

List<Tuple<Integer, Comparator>> sortingColumns = new ArrayList<>(sort.size());
for (Map.Entry<ExpressionId, Sort> entry : sort.entrySet()) {
ExpressionId expressionId = entry.getKey();
Sort s = entry.getValue();

boolean aggSort = false;
for (Sort s : sort) {
Tuple<Integer, Comparator> tuple = new Tuple<>(Integer.valueOf(-1), null);

if (s instanceof AttributeSort) {
AttributeSort as = (AttributeSort) s;
// find the relevant column of each aggregate function
if (as.attribute() instanceof AggregateFunctionAttribute) {
aggSort = true;
AggregateFunctionAttribute afa = (AggregateFunctionAttribute) as.attribute();
afa = (AggregateFunctionAttribute) aliases.getOrDefault(afa.innerId(), afa);
int atIndex = -1;
for (int i = 0; i < fields.size(); i++) {
Tuple<FieldExtraction, ExpressionId> field = fields.get(i);
if (field.v2().equals(afa.innerId())) {
atIndex = i;
break;
}
}

if (atIndex == -1) {
throw new SqlIllegalArgumentException("Cannot find backing column for ordering aggregation [{}]", afa.name());
}
// assemble a comparator for it
Comparator comp = s.direction() == Sort.Direction.ASC ? Comparator.naturalOrder() : Comparator.reverseOrder();
comp = s.missing() == Sort.Missing.FIRST ? Comparator.nullsFirst(comp) : Comparator.nullsLast(comp);

tuple = new Tuple<>(Integer.valueOf(atIndex), comp);
int atIndex = -1;
for (int i = 0; i < fields.size(); i++) {
Tuple<FieldExtraction, ExpressionId> field = fields.get(i);
if (field.v2().equals(expressionId)) {
atIndex = i;
break;
}
}
sortingColumns.add(tuple);
}
if (atIndex == -1) {
throw new SqlIllegalArgumentException("Cannot find backing column for ordering aggregation [{}]", s);
}

if (customSort == null) {
customSort = Boolean.valueOf(aggSort);
// assemble a comparator for it, if it's not an AggregateSort
// then it's pre-sorted by ES so use null
Comparator comp = null;
if (isAggregateSort(s)) {
comp = s.direction() == Sort.Direction.ASC ? Comparator.naturalOrder() : Comparator.reverseOrder();
comp = s.missing() == Sort.Missing.FIRST ? Comparator.nullsFirst(comp) : Comparator.nullsLast(comp);
}

sortingColumns.add(new Tuple<>(Integer.valueOf(atIndex), comp));
}

return aggSort ? sortingColumns : emptyList();
return sortingColumns;
}

private boolean isAggregateSort(Sort s) {
return s instanceof AttributeSort && ((AttributeSort) s).attribute() instanceof AggregateFunctionAttribute;
}

/**
Expand Down Expand Up @@ -212,7 +215,7 @@ public Map<String, GroupByKey> pseudoFunctions() {
return pseudoFunctions;
}

public Set<Sort> sort() {
public Map<ExpressionId, Sort> sort() {
return sort;
}

Expand Down Expand Up @@ -260,10 +263,10 @@ public QueryContainer withScalarProcessors(AttributeMap<Pipe> procs) {
return new QueryContainer(query, aggs, fields, aliases, pseudoFunctions, procs, sort, limit);
}

public QueryContainer addSort(Sort sortable) {
Set<Sort> sort = new LinkedHashSet<>(this.sort);
sort.add(sortable);
return new QueryContainer(query, aggs, fields, aliases, pseudoFunctions, scalarFunctions, sort, limit);
public QueryContainer addSort(ExpressionId expressionId, Sort sortable) {
Map<ExpressionId, Sort> newSort = new LinkedHashMap<>(this.sort);
newSort.put(expressionId, sortable);
return new QueryContainer(query, aggs, fields, aliases, pseudoFunctions, scalarFunctions, newSort, limit);
}

private String aliasName(Attribute attr) {
Expand Down
Loading

0 comments on commit 45ec73d

Please sign in to comment.