Skip to content

Commit

Permalink
[feature](Nereids) enable bucket shuffle join on fragment without sca…
Browse files Browse the repository at this point in the history
…n node (apache#12891)

In the past, with legacy planner, we could only do bucket shuffle join on the join node belonging to the fragment with at least one scan node.
But, bucket shuffle join should do on each join node that left child's data distribution satisfy join's demand.
In nereids, we have data distribution info on each node. So we could enable bucket shuffle join on fragment without scan node.
  • Loading branch information
morrySnow authored and Yijia Su committed Oct 8, 2022
1 parent 9b6191c commit 58541ad
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.doris.common.Pair;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.properties.DistributionSpecHash;
import org.apache.doris.nereids.properties.DistributionSpecHash.ShuffleType;
import org.apache.doris.nereids.properties.OrderKey;
import org.apache.doris.nereids.trees.expressions.EqualTo;
import org.apache.doris.nereids.trees.expressions.ExprId;
Expand Down Expand Up @@ -969,10 +970,16 @@ private PlanFragment constructBucketShuffleJoin(AbstractPhysicalJoin<PhysicalPla
}
// assemble fragment
hashJoinNode.setDistributionMode(HashJoinNode.DistributionMode.BUCKET_SHUFFLE);
if (leftDistributionSpec.getShuffleType() != ShuffleType.NATURAL) {
hashJoinNode.setDistributionMode(DistributionMode.PARTITIONED);
}
connectChildFragment(hashJoinNode, 1, leftFragment, rightFragment, context);
leftFragment.setPlanRoot(hashJoinNode);
// TODO: use left fragment d
DataPartition rhsJoinPartition = new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED,
TPartitionType partitionType = TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED;
if (leftDistributionSpec.getShuffleType() != ShuffleType.NATURAL) {
partitionType = TPartitionType.HASH_PARTITIONED;
}
DataPartition rhsJoinPartition = new DataPartition(partitionType,
rightPartitionExprIds.stream().map(context::findSlotRef).collect(Collectors.toList()));
rightFragment.setOutputPartition(rhsJoinPartition);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public PhysicalProperties visitPhysicalAggregate(PhysicalAggregate<? extends Pla
return PhysicalProperties.GATHER;
}
// TODO: change ENFORCED back to bucketed, when coordinator could process bucket on agg correctly.
return PhysicalProperties.createHash(new DistributionSpecHash(columns, ShuffleType.ENFORCED));
return PhysicalProperties.createHash(new DistributionSpecHash(columns, ShuffleType.BUCKETED));
case DISTINCT_GLOBAL:
default:
throw new RuntimeException("Could not derive output properties for agg phase: " + agg.getAggPhase());
Expand Down
6 changes: 3 additions & 3 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,6 @@ private void prepare() {
}
FragmentExecParams params = fragmentExecParamsMap.get(fragment.getDestFragment().getFragmentId());
params.inputFragments.add(fragment.getFragmentId());

}

coordAddress = new TNetworkAddress(localIP, Config.rpc_port);
Expand Down Expand Up @@ -1017,7 +1016,6 @@ private void computeFragmentExecParams() throws Exception {

int bucketSeq = 0;
int bucketNum = bucketShuffleJoinController.getFragmentBucketNum(destFragment.getFragmentId());
TNetworkAddress dummyServer = new TNetworkAddress("0.0.0.0", 0);

// when left table is empty, it's bucketset is empty.
// set right table destination address to the address of left table
Expand All @@ -1026,6 +1024,8 @@ private void computeFragmentExecParams() throws Exception {
bucketNum = 1;
destParams.instanceExecParams.get(0).bucketSeqSet.add(0);
}
// process bucket shuffle join on fragment without scan node
TNetworkAddress dummyServer = new TNetworkAddress("0.0.0.0", 0);
while (bucketSeq < bucketNum) {
TPlanFragmentDestination dest = new TPlanFragmentDestination();

Expand Down Expand Up @@ -1521,7 +1521,7 @@ private void computeScanRangeAssignment() throws Exception {
bucketShuffleJoinController.computeScanRangeAssignmentByBucket((OlapScanNode) scanNode,
idToBackend, addressToBackendID);
}
if (!(fragmentContainsColocateJoin | fragmentContainsBucketShuffleJoin)) {
if (!(fragmentContainsColocateJoin || fragmentContainsBucketShuffleJoin)) {
computeScanRangeAssignmentByScheduler(scanNode, locations, assignment, assignedBytesPerHost);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ public void testGlobalPhaseAggregate() {
Assertions.assertTrue(result.getOrderSpec().getOrderKeys().isEmpty());
Assertions.assertTrue(result.getDistributionSpec() instanceof DistributionSpecHash);
DistributionSpecHash actual = (DistributionSpecHash) result.getDistributionSpec();
Assertions.assertEquals(ShuffleType.ENFORCED, actual.getShuffleType());
Assertions.assertEquals(ShuffleType.BUCKETED, actual.getShuffleType());
Assertions.assertEquals(Lists.newArrayList(partition).stream()
.map(SlotReference::getExprId).collect(Collectors.toList()),
actual.getOrderedShuffledColumns());
Expand Down

0 comments on commit 58541ad

Please sign in to comment.