Skip to content

Commit

Permalink
[fix](nereids) fix union all instance number (#39999) (#40100)
Browse files Browse the repository at this point in the history
pick from master #39999
  • Loading branch information
xzj7019 authored Sep 2, 2024
1 parent 3ee0e2b commit a4d124e
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 20 deletions.
43 changes: 23 additions & 20 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -1657,7 +1657,6 @@ private void computeFragmentHosts() throws Exception {
}

Pair<PlanNode, PlanNode> pairNodes = findLeftmostNode(fragment.getPlanRoot());
PlanNode fatherNode = pairNodes.first;
PlanNode leftMostNode = pairNodes.second;

/*
Expand All @@ -1672,25 +1671,8 @@ private void computeFragmentHosts() throws Exception {
// (Case B)
// there is no leftmost scan; we assign the same hosts as those of our
// input fragment which has a higher instance_number

int inputFragmentIndex = 0;
int maxParallelism = 0;
// If the fragment has three children, then the first child and the second child are
// the children(both exchange node) of shuffle HashJoinNode,
// and the third child is the right child(ExchangeNode) of broadcast HashJoinNode.
// We only need to pay attention to the maximum parallelism among
// the two ExchangeNodes of shuffle HashJoinNode.
int childrenCount = (fatherNode != null) ? fatherNode.getChildren().size() : 1;
for (int j = 0; j < childrenCount; j++) {
int currentChildFragmentParallelism
= fragmentExecParamsMap.get(fragment.getChild(j).getFragmentId()).instanceExecParams.size();
if (currentChildFragmentParallelism > maxParallelism) {
maxParallelism = currentChildFragmentParallelism;
inputFragmentIndex = j;
}
}

PlanFragmentId inputFragmentId = fragment.getChild(inputFragmentIndex).getFragmentId();
int maxParallelFragmentIndex = findMaxParallelFragmentIndex(fragment);
PlanFragmentId inputFragmentId = fragment.getChild(maxParallelFragmentIndex).getFragmentId();
// AddAll() soft copy()
int exchangeInstances = -1;
if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable() != null) {
Expand Down Expand Up @@ -1838,6 +1820,27 @@ private void computeFragmentHosts() throws Exception {
}
}

private int findMaxParallelFragmentIndex(PlanFragment fragment) {
Preconditions.checkState(!fragment.getChildren().isEmpty(), "fragment has no children");

// exclude broadcast join right side's child fragments
List<PlanFragment> childFragmentCandidates = fragment.getChildren().stream()
.filter(e -> e.getOutputPartition() != DataPartition.UNPARTITIONED)
.collect(Collectors.toList());

int maxParallelism = 0;
int maxParaIndex = 0;
for (int i = 0; i < childFragmentCandidates.size(); i++) {
PlanFragmentId childFragmentId = childFragmentCandidates.get(i).getFragmentId();
int currentChildFragmentParallelism = fragmentExecParamsMap.get(childFragmentId).instanceExecParams.size();
if (currentChildFragmentParallelism > maxParallelism) {
maxParallelism = currentChildFragmentParallelism;
maxParaIndex = i;
}
}
return maxParaIndex;
}

// Traverse the expected runtimeFilterID in each fragment, and establish the corresponding relationship
// between runtimeFilterID and fragment instance addr and select the merge instance of runtimeFilter
private void assignRuntimeFilterAddr() throws Exception {
Expand Down
88 changes: 88 additions & 0 deletions regression-test/suites/query_p0/union/test_union_instance.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_union_instance") {
multi_sql """
drop table if exists t1;
drop table if exists t2;
drop table if exists t3;
drop table if exists t4;
CREATE TABLE `t1` (
`c1` int NULL,
`c2` int NULL,
`c3` int NULL
) ENGINE=OLAP
DUPLICATE KEY(`c1`, `c2`, `c3`)
DISTRIBUTED BY HASH(`c1`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1");
insert into t1 values (1,1,1);
insert into t1 values (2,2,2);
insert into t1 values (3,3,3);
CREATE TABLE `t2` (
`year_week` varchar(45) NULL
) ENGINE=OLAP
DUPLICATE KEY(`year_week`)
DISTRIBUTED BY HASH(`year_week`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1");
CREATE TABLE `t3` (
`unique_key` varchar(2999) NULL,
`brand_name` varchar(96) NULL,
`skc` varchar(150) NULL
) ENGINE=OLAP
UNIQUE KEY(`unique_key`)
DISTRIBUTED BY HASH(`unique_key`) BUCKETS 20
PROPERTIES (
"replication_allocation" = "tag.location.default: 1");
CREATE TABLE `t4` (
`skc_code` varchar(150) NULL
) ENGINE=OLAP
UNIQUE KEY(`skc_code`)
DISTRIBUTED BY HASH(`skc_code`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1");
set parallel_pipeline_task_num=1;
set disable_nereids_rules='PRUNE_EMPTY_PARTITION';
"""
explain {
sql """
SELECT `t`.`year_week` AS `year_week`
,'' AS `brand_name`
, '' AS `skc_code`
FROM `t1` a
CROSS JOIN `t2` t
union all
SELECT '1' AS `year_week`
,`a`.`brand_name` AS `brand_name`
,`a`.`skc` AS `skc_code`
FROM `t3` a
INNER JOIN[shuffle] `t4` b ON `a`.`skc` = `b`.`skc_code`
GROUP BY 1, 2, 3;
"""
contains "999:VNESTED LOOP JOIN"
contains "1005:VEXCHANGE"
contains "1015:VEXCHANGE"
contains "1025:VEXCHANGE"
contains "1040:VUNION"
}
}

0 comments on commit a4d124e

Please sign in to comment.