Skip to content

Commit

Permalink
[fix](nereids) refine window child's local shuffle dist-expr
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongjian.xzj committed Aug 19, 2024
1 parent 655ac7a commit 740df95
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,6 @@ public static Expr translate(Expression expression, PlanTranslatorContext contex
return expression.accept(INSTANCE, context);
}

public static Expr translateUseBackup(Expression expression, PlanTranslatorContext context) {
context.setTranslateUsingBackup(true);
Expr result = expression.accept(INSTANCE, context);
context.setTranslateUsingBackup(false);
return result;
}

@Override
public Expr visitAlias(Alias alias, PlanTranslatorContext context) {
return alias.child().accept(this, context);
Expand Down Expand Up @@ -299,9 +292,8 @@ public Expr visitNot(Not not, PlanTranslatorContext context) {

@Override
public Expr visitSlotReference(SlotReference slotReference, PlanTranslatorContext context) {
boolean translateUsingBackup = context.getTranslateUsingBackup();
return translateUsingBackup ? context.findBackupSlotRef(slotReference.getExprId())
: context.findSlotRef(slotReference.getExprId());
return context.getCloneExprIdToSlot() == null ? context.findSlotRef(slotReference.getExprId())
: context.findCloneSlotRef(slotReference.getExprId());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2122,7 +2122,9 @@ public PlanFragment visitPhysicalQuickSort(PhysicalQuickSort<? extends Plan> sor
PlanTranslatorContext context) {
PlanFragment inputFragment = sort.child(0).accept(this, context);
List<List<Expr>> distributeExprLists = getDistributeExprs(sort.child(0));
context.backupPlanToExprIdSlotRefMap(sort);
// 1. Backup current plan to exprIdToSlotRef map
context.addPlanToExprIdSlotRefMap(sort);

// 2. According to the type of sort, generate physical plan
if (!sort.getSortPhase().isMerge()) {
// For localSort or Gather->Sort, we just need to add sortNode
Expand Down Expand Up @@ -2279,8 +2281,9 @@ public PlanFragment visitPhysicalRepeat(PhysicalRepeat<? extends Plan> repeat, P
@Override
public PlanFragment visitPhysicalWindow(PhysicalWindow<? extends Plan> physicalWindow,
PlanTranslatorContext context) {
PlanFragment inputPlanFragment = physicalWindow.child(0).accept(this, context);
List<List<Expr>> distributeExprLists = getDistributeExprs(physicalWindow.child(0));
Plan childPlan = physicalWindow.child(0);
PlanFragment inputPlanFragment = childPlan.accept(this, context);
List<List<Expr>> distributeExprLists = getDistributeExprs(childPlan);

// 1. translate to old optimizer variable
// variable in Nereids
Expand Down Expand Up @@ -2332,13 +2335,14 @@ public PlanFragment visitPhysicalWindow(PhysicalWindow<? extends Plan> physicalW
// current op tree only has two patterns, one is the window with sort child, and another is two phase
// global partition topn child, and the latter is no need to refresh its distribution expr list since
// it's expected to be the same as window's, for the former pattern, it is the real candidate.
Map<ExprId, SlotRef> exprIdToSlotRef = context.findBackupExprIdToSlotMap(physicalWindow.child(0));
context.setBackUpExprIdToSlot(exprIdToSlotRef);
List<Expr> newPartitionExprs = partitionKeyList.stream()
.map(e -> ExpressionTranslator.translateUseBackup(e, context))
.collect(Collectors.toList());
newChildDistributeExprLists.add(newPartitionExprs);
inputPlanFragment.getPlanRoot().setChildrenDistributeExprLists(newChildDistributeExprLists);
if (context.findExprIdToSlotRefFromMap(childPlan)) {
List<Expr> newPartitionExprs = partitionKeyList.stream()
.map(e -> ExpressionTranslator.translate(e, context))
.collect(Collectors.toList());
newChildDistributeExprLists.add(newPartitionExprs);
inputPlanFragment.getPlanRoot().setChildrenDistributeExprLists(newChildDistributeExprLists);
context.resetCloneExprIdToSlot();
}
}

// 2. get bufferedTupleDesc from SortNode and compute isNullableMatched
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,9 @@ public class PlanTranslatorContext {
*/
private final Map<ExprId, SlotRef> exprIdToSlotRef = Maps.newHashMap();

private boolean translateUsingBackup = false;
private final Map<Plan, Map<ExprId, SlotRef>> savedPlanToExprIdToSlotRef = Maps.newHashMap();
private final Map<Plan, Map<ExprId, SlotRef>> clonePlanToExprIdToSlotRefMap = Maps.newHashMap();

private Map<ExprId, SlotRef> backUpExprIdToSlot = Maps.newHashMap();
private Map<ExprId, SlotRef> cloneExprIdToSlot = Maps.newHashMap();

/**
* Inverted index from legacy slot to Nereids' slot.
Expand Down Expand Up @@ -193,14 +192,6 @@ public TupleDescriptor generateTupleDesc() {
return descTable.createTupleDescriptor();
}

public boolean getTranslateUsingBackup() {
return translateUsingBackup;
}

public void setTranslateUsingBackup(boolean usingBackup) {
this.translateUsingBackup = usingBackup;
}

public Optional<RuntimeFilterTranslator> getRuntimeTranslator() {
return Optional.ofNullable(translator);
}
Expand Down Expand Up @@ -230,12 +221,12 @@ public void addExprIdSlotRefPair(ExprId exprId, SlotRef slotRef) {
slotIdToExprId.put(slotRef.getDesc().getId(), exprId);
}

public void backupPlanToExprIdSlotRefMap(Plan planNode) {
Map<ExprId, SlotRef> newExprIdToSlotRef = Maps.newHashMap();
public void addPlanToExprIdSlotRefMap(Plan plan) {
Map<ExprId, SlotRef> cloneExprIdToSlotRef = Maps.newHashMap();
for (Map.Entry<ExprId, SlotRef> entry : exprIdToSlotRef.entrySet()) {
newExprIdToSlotRef.put(entry.getKey(), (SlotRef) entry.getValue().clone());
cloneExprIdToSlotRef.put(entry.getKey(), (SlotRef) entry.getValue().clone());
}
savedPlanToExprIdToSlotRef.put(planNode, newExprIdToSlotRef);
clonePlanToExprIdToSlotRefMap.put(plan, cloneExprIdToSlotRef);
}

public void addExprIdColumnRefPair(ExprId exprId, ColumnRefExpr columnRefExpr) {
Expand All @@ -258,16 +249,20 @@ public SlotRef findSlotRef(ExprId exprId) {
return exprIdToSlotRef.get(exprId);
}

public SlotRef findBackupSlotRef(ExprId exprId) {
return backUpExprIdToSlot.get(exprId);
public SlotRef findCloneSlotRef(ExprId exprId) {
return cloneExprIdToSlot.get(exprId);
}

public boolean findExprIdToSlotRefFromMap(Plan plan) {
return (cloneExprIdToSlot = clonePlanToExprIdToSlotRefMap.get(plan)) != null;
}

public Map<ExprId, SlotRef> findBackupExprIdToSlotMap(Plan plan) {
return savedPlanToExprIdToSlotRef.get(plan);
public Map<ExprId, SlotRef> getCloneExprIdToSlot() {
return cloneExprIdToSlot;
}

public void setBackUpExprIdToSlot(Map<ExprId, SlotRef> map) {
backUpExprIdToSlot = map;
public void resetCloneExprIdToSlot() {
cloneExprIdToSlot = null;
}

public ColumnRefExpr findColumnRef(ExprId exprId) {
Expand Down

0 comments on commit 740df95

Please sign in to comment.