-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[multistage] Add support for the ranking ROW_NUMBER() window function #10587
[multistage] Add support for the ranking ROW_NUMBER() window function #10587
Conversation
@@ -1107,6 +1323,22 @@ | |||
"\n" | |||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also cover the subquery style in the plans ?
Something like
SELECT * FROM (
SELECT
ROW_NUMBER() OVER (.....) rn,
c1,
c2,
c3
FROM
foo
) temp
WHERE
rn > 10 AND rn <=20;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the sake of completeness in the planning tests, can we also add WITH / CTE if possible ?
WITH cte AS(
SELECT
ROW_NUMBER() OVER (.......) rn,
c1,
c2,
c3
FROM
foo
)
SELECT
*
FROM
cte
WHERE
rn > 10 AND
rn <= 20;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@siddharthteotia I actually have a couple of plans of these types at the end of this file. Are these sufficient?:
{
"description": "Window function CTE: row_number WITH statement having OVER with PARTITION BY ORDER BY",
"sql": "EXPLAIN PLAN FOR WITH windowfunc AS (SELECT a.col1, ROW_NUMBER() OVER(PARTITION BY a.col2 ORDER BY a.col3) as rownum from a) SELECT a.col1, a.rownum FROM windowfunc AS a where a.rownum < 5",
"output": [
"Execution Plan",
"\nLogicalProject(col1=[$0], $1=[$3])",
"\n LogicalFilter(condition=[<($3, 5)])",
"\n LogicalWindow(window#0=[window(partition {1} order by [2] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
"\n PinotLogicalSortExchange(distribution=[hash[1]], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
]
},
{
"description": "Window function subquery: row_number having OVER with PARTITION BY ORDER BY",
"sql": "EXPLAIN PLAN FOR SELECT row_number, col2, col3 FROM (SELECT ROW_NUMBER() OVER(PARTITION BY a.col2 ORDER BY a.col3 DESC) as row_number, a.col2, a.col3 FROM a) WHERE row_number <= 10",
"output": [
"Execution Plan",
"\nLogicalProject(row_number=[$2], col2=[$0], col3=[$1])",
"\n LogicalFilter(condition=[<=($2, 10)])",
"\n LogicalWindow(window#0=[window(partition {0} order by [1 DESC] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
"\n PinotLogicalSortExchange(distribution=[hash[0]], collation=[[1 DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
]
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. Thanks for clarifying
Can we add an example of a JOIN query with ROW_NUMBER where the subquery does a JOIN and we pick the particular ROW_NUMBER for each partition in the outer query ?
SELECT
<columnList>
FROM
(
SELECT
<columnList>,
ROW_NUMBER OVER(PARTITION BY .. ORDER BY ... DESC) AS rn
FROM
T1 INNER JOIN T2
ON T1.c1 = T2.c2
WHERE .......
)
WHERE rn = 1
So we pick the highest guy (since each partition is ordered descending) in the outer query from each partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done as part of #10684
{ | ||
"description": "single OVER(ORDER BY) row_number and select col with select alias", | ||
"sql": "EXPLAIN PLAN FOR SELECT a.col1 AS value1, ROW_NUMBER() OVER(ORDER BY a.col2) AS row_number FROM a", | ||
"notes": "TODO: Look into why aliases are getting ignored in the final plan", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we create an issue for this otherwise TODO might just get buried in the code ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done: #10682
{ | ||
"description": "single OVER(ORDER BY) row_number and select col", | ||
"sql": "EXPLAIN PLAN FOR SELECT a.col1, ROW_NUMBER() OVER(ORDER BY a.col2) FROM a", | ||
"output": [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's hard to review tests with constant scroll up and down to see if the test variant that comes to my mind has been added or not
So please feel free to ignore / resolve if already added the following. I am just trying to see we have proper coverage
SELECT a.col1, ROW_NUMBER() OVER(ORDER BY a.col2)
FROM a ORDER BY a.col1
SELECT a.col1, ROW_NUMBER() OVER(ORDER BY a.col2)
FROM a ORDER BY a.col2
SELECT a.col1, ROW_NUMBER() AS rn OVER(ORDER BY a.col2)
FROM a ORDER BY rn DESC
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done as part of #10684
"Execution Plan", | ||
"\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])", | ||
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])", | ||
"\n LogicalSort(sort0=[$0], dir0=[ASC])", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I probably have this question because I missed the generic sort discussion while I was out.
Can you please clarify why we have LogicalSort
before LogicalSortExchange
when the goal is to do exchange without sorting on the sender side and instead do on the receiver side and for that LogicalSort
above LogicalSortExchange
should take care of that ideally ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This pattern of LogicalSort -> PinotLogicalSortExchange -> LogicalSort is a pattern followed for global ORDER BY type queries for the ordering part or to apply LIMITs (e.g. select foo from table a order by bar). It still makes sense to do ordering on the sender side to apply limits (if at the leaf stage). Also note that the data may be distributed below the PinotLogicalSortExchange, and that's why the sorting is needed after the exchange as well.
Our eventual goal is to detect that the incoming data is already sorted in the LogicalSortExchange and then perform a merge sort instead of the full priority queue based sorting. This will greatly speed up ordering throughout the nodes and even allow sending partial results rather than waiting for all the rows at each ordering step.
Let's discuss this in more detail and hopefully that can help clarify.
There are optimization opportunities in the plans btw. Today we don't check that if the window has already sorted the data, and if the ordering required by order by is the same column + collation we can skip the sort above window etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. Let's discuss this separately
@@ -646,6 +803,21 @@ | |||
"\n" | |||
] | |||
}, | |||
{ | |||
"description": "single OVER(PARTITION BY) row_number with select col and group by", | |||
"sql": "EXPLAIN PLAN FOR SELECT a.col1, ROW_NUMBER() OVER(PARTITION BY a.col1) FROM a GROUP BY a.col1, a.col3", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May be I missed it but I don't see any tests for OVER(PARTITION BY .. ORDER BY ..)
variant ? Is that not supported in this PR ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nvm... I just saw them. Ignore
System.arraycopy(existingRow, 0, row, 0, existingRow.length); | ||
for (int i = 0; i < _windowAccumulators.length; i++) { | ||
row[i + existingRow.length] = _windowAccumulators[i].getResultForKeys(partitionKey, orderKey); | ||
if (_windowFrame.getWindowFrameType() == WindowNode.WindowFrameType.RANGE) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(nit) can you add a minor comment indicating that we will enter this block for these .... window functions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done as part of #10684
@@ -52,8 +53,10 @@ | |||
* columns and in addition will add the aggregation columns to the output data. | |||
* [input columns, aggregate result1, ... aggregate resultN] | |||
* | |||
* The window functions supported today are SUM/COUNT/MIN/MAX/AVG/BOOL_OR/BOOL_AND aggregations. Window functions also | |||
* include other types of functions such as rank and value functions. | |||
* The window functions supported today are: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest adding a minor note on RANGE vs ROW for future readers and which window functions fall into which category and the corresponding TODOs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done as part of #10684
@siddharthteotia thanks for the review! I'll address these comments as a separate PR (don't want to mix this into the other PR I have open to fix the empty LogicalProject issue as it'll be confusing). |
@siddharthteotia opened a PR to address comments: #10684 |
The previous PR #10527 for this was accidentally merged and had to be reverted. This PR is the same
This PR adds support for the ranking ROW_NUMBER() window function in Apache Pinot. ROW_NUMBER() requires ROW type window function support rather than RANGE type for which we added support in Phase 1. This PR sets up a potential framework to use ROW type window functions but only implements this for ROW_NUMBER(). ROW_NUMBER() can be used in the following types of queries:
There are some limitations with ROW_NUMBER() which are:
The design document and issue for window functions support can be found below:
Prior Phase 1 PRs related to window functions:
cc @siddharthteotia @walterddr @vvivekiyer @ankitsultana