Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Multi-stage] Support partition based leaf stage processing #11234

Merged

Conversation

Jackie-Jiang
Copy link
Contributor

Add supports for the new tableOptions with partition_column and partition_size.

When this table option (hint) is attached to the leaf stage, we will honor it, and process the leaf stage for each partition with a separate thread. In order to do so, the table should be partitioned, and all the segments for any partition must be served by the same server. It will throw exception if table fails to reach this condition.
Without the hint, the leaf stage will always be processed as a whole part, which is less efficient. More importantly, when the leaf stage result is partitioned, the intermediate stage can also benefit from it by increasing the parallelism and avoid shuffling the data. It can benefit JOIN (achieve colocated join) and GROUP BY (higher parallelism) the most.

The old joinOptions is_colocated_by_join_keys is removed because it can be achieved with the new hint on both left and right table.

See some example queries in QueryHints.json

@Jackie-Jiang Jackie-Jiang added enhancement multi-stage Related to the multi-stage query engine labels Aug 1, 2023
@codecov-commenter
Copy link

codecov-commenter commented Aug 1, 2023

Codecov Report

Merging #11234 (e23c7d0) into master (044588f) will increase coverage by 0.00%.
The diff coverage is 0.00%.

@@            Coverage Diff            @@
##           master   #11234     +/-   ##
=========================================
  Coverage    0.11%    0.11%             
=========================================
  Files        2229     2155     -74     
  Lines      119951   116802   -3149     
  Branches    18171    17727    -444     
=========================================
  Hits          137      137             
+ Misses     119794   116645   -3149     
  Partials       20       20             
Flag Coverage Δ
integration1temurin11 ?
integration1temurin17 ?
integration1temurin20 ?
integration2temurin17 ?
integration2temurin20 ?
unittests1temurin11 ?
unittests1temurin17 ?
unittests1temurin20 ?
unittests2temurin11 0.11% <0.00%> (-0.01%) ⬇️
unittests2temurin17 0.11% <0.00%> (-0.01%) ⬇️
unittests2temurin20 0.11% <0.00%> (-0.01%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Changed Coverage Δ
.../query/planner/logical/RelToPlanNodeConverter.java 0.00% <0.00%> (ø)
...ery/planner/physical/DispatchablePlanMetadata.java 0.00% <0.00%> (ø)
...uery/planner/physical/DispatchablePlanVisitor.java 0.00% <0.00%> (ø)
...ery/planner/physical/MailboxAssignmentVisitor.java 0.00% <0.00%> (ø)
.../apache/pinot/query/planner/plannode/JoinNode.java 0.00% <ø> (ø)
.../org/apache/pinot/query/routing/WorkerManager.java 0.00% <0.00%> (ø)

... and 74 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

Copy link
Contributor

@walterddr walterddr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good to me. minor comments

@@ -354,14 +349,12 @@ private ColocatedTableInfo getColocatedTableInfo(String tableName) {
TimeBoundaryInfo timeBoundaryInfo = _routingManager.getTimeBoundaryInfo(offlineTableName);
// Ignore OFFLINE side when time boundary info is unavailable
if (timeBoundaryInfo == null) {
return getRealtimeColocatedTableInfo(realtimeTableName);
return getRealtimeColocatedTableInfo(realtimeTableName, partitionKey, numPartitions);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add a integration-test for hybrid table colocated join? i am not sure how we can mock that in unit-test but an integration test should be super helpful to cover this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. That won't be trivial, so probably can be addressed in a separate PR.

},
{
"description": "Colocated, Dynamic broadcast SEMI-JOIN with partially empty right table result for some servers",
"sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast', is_colocated_by_join_keys='true') */ {tbl1}.name, COUNT(*) FROM {tbl1} WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} WHERE {tbl2}.val = 'z') GROUP BY {tbl1}.name"
"sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */ {tbl1}.name, COUNT(*) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.val = 'z') GROUP BY {tbl1}.name"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add test with set stageParallelism = 2; variance to the hinted tests here. we never tested option + hints at the same time, good to add. (but can be followed up, i just tried and they all passed)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently stageParallelism is ignored. Can be added when we add the support to further split on single partition data

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will not be if we do agg on top of join that joins on one key, which is colocated, but group by on a different key -- that one will still be run with the stage parallelism setting, yes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, even though they won't be applied to the same stage at the same time. Added one query

@Jackie-Jiang Jackie-Jiang merged commit 5d312e5 into apache:master Aug 2, 2023
20 of 22 checks passed
@Jackie-Jiang Jackie-Jiang deleted the partition_worker_assignment branch August 2, 2023 17:03
{
"description": "Inner join with group by",
"sql": "EXPLAIN PLAN FOR SELECT /*+ joinOptions(is_colocated_by_join_keys='true'), aggOptions(is_partitioned_by_group_by_keys='true') */a.col1, AVG(b.col3) FROM a JOIN b ON a.col1 = b.col2 WHERE a.col3 >= 0 AND a.col2 = 'a' AND b.col3 < 0 GROUP BY a.col1",
"sql": "EXPLAIN PLAN FOR SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ a.col1, AVG(b.col3) FROM a JOIN b ON a.col1 = b.col2 WHERE a.col3 >= 0 AND a.col2 = 'a' AND b.col3 < 0 GROUP BY a.col1",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it'll be good to have a few planner side tests that use both the aggOptions for is_partitioned_by_group_by_keys and use the newly table options for partitioning. One which also uses the join option to set dynamic_broadcast with table partitioning and is_partitioned_by_group_by_keys aggOption as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why I didn't add plan test for this hint is because this hint is not applied during the planning time, so the plan is always the same with/without the hint

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as @Jackie-Jiang mentioned the table hints are not altering the logical plan so the only thing we can test here is verify the hints were attached properly (e.g. modify the TableScanNode toString method)

however, since we added physical plan in #11052 we can try to verify the physical plan but that's definitely a follow up IMO

Preconditions.checkState(tablePartitionInfo.getPartitionColumn().equals(partitionKey),
"Partition key: %s does not match partition column: %s for table: %s", partitionKey,
tablePartitionInfo.getPartitionColumn(), tableNameWithType);
Preconditions.checkState(tablePartitionInfo.getNumPartitions() == numPartitions,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i was just curious, why even allow users to pass in a numPartitions if we expect it to match the tablePartitionInfo? Can't we just look this up and set the numPartitions ourselves?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I think we keep it this way for future proof, when we want different partitions within query vs table. cc @walterddr for more details

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes technically speaking the table partition hint is used for 2 reasons (mixed)

  1. to indicate what's the table partition - which we can in the future improved to be automatic
  2. indicate that data is already partitioned and no need to reshuffle the data when unnecessary - this we can debate whether we want to create a new hint for such behavior change

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks for the explanation

@@ -1,33 +1,19 @@
{
"pinot_hint_option_tests": {
"queries": [
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not add some planner tests for the tableOptions for partition key + num partitions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s0nskar pushed a commit to s0nskar/pinot that referenced this pull request Aug 10, 2023
@walterddr walterddr mentioned this pull request Nov 16, 2023
6 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement multi-stage Related to the multi-stage query engine
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants