Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[multistage] Add support for the ranking ROW_NUMBER() window function #10587

Merged

Conversation

somandal
Copy link
Contributor

@somandal somandal commented Apr 10, 2023

The previous PR #10527 for this was accidentally merged and had to be reverted. This PR is the same

This PR adds support for the ranking ROW_NUMBER() window function in Apache Pinot. ROW_NUMBER() requires ROW type window function support rather than RANGE type for which we added support in Phase 1. This PR sets up a potential framework to use ROW type window functions but only implements this for ROW_NUMBER(). ROW_NUMBER() can be used in the following types of queries:

  • Empty OVER()(s) with some select column [see limitations below]
  • OVER(ORDER BY)(s)
  • OVER(PARTITION BY)(s)
  • OVER(PARTITION BY ORDER BY)(s)

There are some limitations with ROW_NUMBER() which are:

  • Apache Calcite enforces the window function type to ROW. Due to the lack of support for multiple window groups today, ROW_NUMBER() cannot be combined with other window aggregation functions in the same query.
  • Queries which use an empty OVER() without any other column results in Apache Calcite not projecting any columns. E.g. query: SELECT ROW_NUMBER() OVER() from table;. I've added a TODO to look into how to get Apache Calcite to project at least one column in this scenario.
    • See ProjectWindowTransposeRule to better understand what's happening here. Basically it tries to push a Project below the Window, but finds no input fields referenced. Due to this it creates an empty Project below the Window. The Project above the Window gets marked as trivial and is removed resulting in the following type of plan:
Query: SELECT ROW_NUMBER() OVER() FROM a

Execution Plan
LogicalWindow(window#0=[window( rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])
  LogicalExchange(distribution=[hash])
    LogicalProject
      LogicalTableScan(table=[[a]])

The design document and issue for window functions support can be found below:

Prior Phase 1 PRs related to window functions:

cc @siddharthteotia @walterddr @vvivekiyer @ankitsultana

@@ -1107,6 +1323,22 @@
"\n"
]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also cover the subquery style in the plans ?

Something like

SELECT * FROM (
    SELECT 
        ROW_NUMBER() OVER (.....) rn, 
        c1, 
        c2, 
        c3
    FROM
        foo
    ) temp    
WHERE
    rn > 10 AND rn <=20;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the sake of completeness in the planning tests, can we also add WITH / CTE if possible ?

WITH cte AS(
    SELECT 
        ROW_NUMBER() OVER (.......) rn, 
        c1, 
        c2, 
        c3
    FROM
        foo
)
SELECT 
    * 
FROM 
    cte
WHERE 
    rn > 10 AND 
    rn <= 20;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@siddharthteotia I actually have a couple of plans of these types at the end of this file. Are these sufficient?:

      {
        "description": "Window function CTE: row_number WITH statement having OVER with PARTITION BY ORDER BY",
        "sql": "EXPLAIN PLAN FOR WITH windowfunc AS (SELECT a.col1, ROW_NUMBER() OVER(PARTITION BY a.col2 ORDER BY a.col3) as rownum from a) SELECT a.col1, a.rownum FROM windowfunc AS a where a.rownum < 5",
        "output": [
          "Execution Plan",
          "\nLogicalProject(col1=[$0], $1=[$3])",
          "\n  LogicalFilter(condition=[<($3, 5)])",
          "\n    LogicalWindow(window#0=[window(partition {1} order by [2] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
          "\n      PinotLogicalSortExchange(distribution=[hash[1]], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
          "\n        LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
          "\n          LogicalTableScan(table=[[a]])",
          "\n"
        ]
      },
      {
        "description": "Window function subquery: row_number having OVER with PARTITION BY ORDER BY",
        "sql": "EXPLAIN PLAN FOR SELECT row_number, col2, col3 FROM (SELECT ROW_NUMBER() OVER(PARTITION BY a.col2 ORDER BY a.col3 DESC) as row_number, a.col2, a.col3 FROM a) WHERE row_number <= 10",
        "output": [
          "Execution Plan",
          "\nLogicalProject(row_number=[$2], col2=[$0], col3=[$1])",
          "\n  LogicalFilter(condition=[<=($2, 10)])",
          "\n    LogicalWindow(window#0=[window(partition {0} order by [1 DESC] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
          "\n      PinotLogicalSortExchange(distribution=[hash[0]], collation=[[1 DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
          "\n        LogicalProject(col2=[$1], col3=[$2])",
          "\n          LogicalTableScan(table=[[a]])",
          "\n"
        ]
      }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. Thanks for clarifying

Can we add an example of a JOIN query with ROW_NUMBER where the subquery does a JOIN and we pick the particular ROW_NUMBER for each partition in the outer query ?

SELECT 
<columnList>
FROM 
   (
        SELECT
        <columnList>,
        ROW_NUMBER OVER(PARTITION BY .. ORDER BY ... DESC) AS rn
        FROM 
        T1 INNER JOIN T2
        ON T1.c1 = T2.c2
        WHERE .......
  )
WHERE rn = 1

So we pick the highest guy (since each partition is ordered descending) in the outer query from each partition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done as part of #10684

{
"description": "single OVER(ORDER BY) row_number and select col with select alias",
"sql": "EXPLAIN PLAN FOR SELECT a.col1 AS value1, ROW_NUMBER() OVER(ORDER BY a.col2) AS row_number FROM a",
"notes": "TODO: Look into why aliases are getting ignored in the final plan",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we create an issue for this otherwise TODO might just get buried in the code ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done: #10682

{
"description": "single OVER(ORDER BY) row_number and select col",
"sql": "EXPLAIN PLAN FOR SELECT a.col1, ROW_NUMBER() OVER(ORDER BY a.col2) FROM a",
"output": [
Copy link
Contributor

@siddharthteotia siddharthteotia Apr 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's hard to review tests with constant scroll up and down to see if the test variant that comes to my mind has been added or not

So please feel free to ignore / resolve if already added the following. I am just trying to see we have proper coverage

SELECT a.col1, ROW_NUMBER() OVER(ORDER BY a.col2) 
FROM a ORDER BY a.col1
SELECT a.col1, ROW_NUMBER() OVER(ORDER BY a.col2) 
FROM a ORDER BY a.col2
SELECT a.col1, ROW_NUMBER() AS rn OVER(ORDER BY a.col2)
 FROM a ORDER BY rn DESC

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done as part of #10684

"Execution Plan",
"\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$0], dir0=[ASC])",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I probably have this question because I missed the generic sort discussion while I was out.

Can you please clarify why we have LogicalSort before LogicalSortExchange when the goal is to do exchange without sorting on the sender side and instead do on the receiver side and for that LogicalSort above LogicalSortExchange should take care of that ideally ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pattern of LogicalSort -> PinotLogicalSortExchange -> LogicalSort is a pattern followed for global ORDER BY type queries for the ordering part or to apply LIMITs (e.g. select foo from table a order by bar). It still makes sense to do ordering on the sender side to apply limits (if at the leaf stage). Also note that the data may be distributed below the PinotLogicalSortExchange, and that's why the sorting is needed after the exchange as well.

Our eventual goal is to detect that the incoming data is already sorted in the LogicalSortExchange and then perform a merge sort instead of the full priority queue based sorting. This will greatly speed up ordering throughout the nodes and even allow sending partial results rather than waiting for all the rows at each ordering step.

Let's discuss this in more detail and hopefully that can help clarify.

There are optimization opportunities in the plans btw. Today we don't check that if the window has already sorted the data, and if the ordering required by order by is the same column + collation we can skip the sort above window etc.

Copy link
Contributor

@siddharthteotia siddharthteotia Apr 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Let's discuss this separately

@@ -646,6 +803,21 @@
"\n"
]
},
{
"description": "single OVER(PARTITION BY) row_number with select col and group by",
"sql": "EXPLAIN PLAN FOR SELECT a.col1, ROW_NUMBER() OVER(PARTITION BY a.col1) FROM a GROUP BY a.col1, a.col3",
Copy link
Contributor

@siddharthteotia siddharthteotia Apr 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be I missed it but I don't see any tests for OVER(PARTITION BY .. ORDER BY ..) variant ? Is that not supported in this PR ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm... I just saw them. Ignore

System.arraycopy(existingRow, 0, row, 0, existingRow.length);
for (int i = 0; i < _windowAccumulators.length; i++) {
row[i + existingRow.length] = _windowAccumulators[i].getResultForKeys(partitionKey, orderKey);
if (_windowFrame.getWindowFrameType() == WindowNode.WindowFrameType.RANGE) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) can you add a minor comment indicating that we will enter this block for these .... window functions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done as part of #10684

@@ -52,8 +53,10 @@
* columns and in addition will add the aggregation columns to the output data.
* [input columns, aggregate result1, ... aggregate resultN]
*
* The window functions supported today are SUM/COUNT/MIN/MAX/AVG/BOOL_OR/BOOL_AND aggregations. Window functions also
* include other types of functions such as rank and value functions.
* The window functions supported today are:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest adding a minor note on RANGE vs ROW for future readers and which window functions fall into which category and the corresponding TODOs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done as part of #10684

@siddharthteotia siddharthteotia merged commit 4914947 into apache:master Apr 24, 2023
@somandal
Copy link
Contributor Author

@siddharthteotia thanks for the review! I'll address these comments as a separate PR (don't want to mix this into the other PR I have open to fix the empty LogicalProject issue as it'll be confusing).

@somandal
Copy link
Contributor Author

@siddharthteotia opened a PR to address comments: #10684

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants