Skip to content

Commit

Permalink
pick some pr from to branch21 #38115 #38008 #37929 (#38940)
Browse files Browse the repository at this point in the history
## Proposed changes

pr: #38115
commitId: 2b29288

pr: #38008
commitId: c6b924d

pr: #37929
commitId: d44fcdc
  • Loading branch information
seawinde authored Aug 7, 2024
1 parent 2543b56 commit 2b1aa05
Show file tree
Hide file tree
Showing 45 changed files with 2,841 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,9 @@ public void analyzeSelectClause(Analyzer analyzer) throws AnalysisException {

Expr selectListItemExpr = selectListItem.getExpr();
selectListItemExpr.setDisableTableName(true);
if (!(selectListItemExpr instanceof SlotRef) && !(selectListItemExpr instanceof FunctionCallExpr)
&& !(selectListItemExpr instanceof ArithmeticExpr)) {
Expr realItem = selectListItemExpr.unwrapExpr(false);
if (!(realItem instanceof SlotRef) && !(realItem instanceof FunctionCallExpr)
&& !(realItem instanceof ArithmeticExpr)) {
throw new AnalysisException("The materialized view only support the single column or function expr. "
+ "Error column: " + selectListItemExpr.toSql());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiPredicate;

/**
* when do some operation, do something about cache
Expand Down Expand Up @@ -76,13 +77,17 @@ public Set<BaseTableInfo> getMtmvsByBaseTableOneLevel(BaseTableInfo table) {
* @param ctx
* @return
*/
public Set<MTMV> getAvailableMTMVs(List<BaseTableInfo> tableInfos, ConnectContext ctx) {
public Set<MTMV> getAvailableMTMVs(List<BaseTableInfo> tableInfos, ConnectContext ctx,
boolean forceConsistent, BiPredicate<ConnectContext, MTMV> predicate) {
Set<MTMV> res = Sets.newLinkedHashSet();
Set<BaseTableInfo> mvInfos = getMTMVInfos(tableInfos);
for (BaseTableInfo tableInfo : mvInfos) {
try {
MTMV mtmv = (MTMV) MTMVUtil.getTable(tableInfo);
if (isMVPartitionValid(mtmv, ctx)) {
if (predicate.test(ctx, mtmv)) {
continue;
}
if (isMVPartitionValid(mtmv, ctx, forceConsistent)) {
res.add(mtmv);
}
} catch (AnalysisException e) {
Expand All @@ -94,9 +99,10 @@ public Set<MTMV> getAvailableMTMVs(List<BaseTableInfo> tableInfos, ConnectContex
}

@VisibleForTesting
public boolean isMVPartitionValid(MTMV mtmv, ConnectContext ctx) {
public boolean isMVPartitionValid(MTMV mtmv, ConnectContext ctx, boolean forceConsistent) {
long currentTimeMillis = System.currentTimeMillis();
return !CollectionUtils
.isEmpty(MTMVRewriteUtil.getMTMVCanRewritePartitions(mtmv, ctx, System.currentTimeMillis()));
.isEmpty(MTMVRewriteUtil.getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMillis, forceConsistent));
}

private Set<BaseTableInfo> getMTMVInfos(List<BaseTableInfo> tableInfos) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,9 @@ public class MTMVRewriteUtil {
* @return
*/
public static Collection<Partition> getMTMVCanRewritePartitions(MTMV mtmv, ConnectContext ctx,
long currentTimeMills) {
long currentTimeMills, boolean forceConsistent) {
List<Partition> res = Lists.newArrayList();
Collection<Partition> allPartitions = mtmv.getPartitions();
// check session variable if enable rewrite
if (!ctx.getSessionVariable().isEnableMaterializedViewRewrite()) {
return res;
}
if (MTMVUtil.mtmvContainsExternalTable(mtmv) && !ctx.getSessionVariable()
.isMaterializedViewRewriteEnableContainExternalTable()) {
return res;
}

MTMVRelation mtmvRelation = mtmv.getRelation();
if (mtmvRelation == null) {
return res;
Expand All @@ -71,7 +62,7 @@ public static Collection<Partition> getMTMVCanRewritePartitions(MTMV mtmv, Conne
long gracePeriodMills = mtmv.getGracePeriod();
for (Partition partition : allPartitions) {
if (gracePeriodMills > 0 && currentTimeMills <= (partition.getVisibleVersionTime()
+ gracePeriodMills)) {
+ gracePeriodMills) && !forceConsistent) {
res.add(partition);
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public class CascadesContext implements ScheduleContext {
private final Optional<CTEId> currentTree;
private final Optional<CascadesContext> parent;

private final List<MaterializationContext> materializationContexts;
private final Set<MaterializationContext> materializationContexts;
private boolean isLeadingJoin = false;

private boolean isLeadingDisableJoinReorder = false;
Expand Down Expand Up @@ -160,7 +160,7 @@ private CascadesContext(Optional<CascadesContext> parent, Optional<CTEId> curren
this.currentJobContext = new JobContext(this, requireProperties, Double.MAX_VALUE);
this.subqueryExprIsAnalyzed = new HashMap<>();
this.runtimeFilterContext = new RuntimeFilterContext(getConnectContext().getSessionVariable());
this.materializationContexts = new ArrayList<>();
this.materializationContexts = new HashSet<>();
if (statementContext.getConnectContext() != null) {
ConnectContext connectContext = statementContext.getConnectContext();
SessionVariable sessionVariable = connectContext.getSessionVariable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ public class NereidsPlanner extends Planner {
// The cost of optimized plan
private double cost = 0;
private LogicalPlanAdapter logicalPlanAdapter;
private List<PlannerHook> hooks = new ArrayList<>();

public NereidsPlanner(StatementContext statementContext) {
this.statementContext = statementContext;
Expand Down Expand Up @@ -274,7 +273,7 @@ private void analyze(boolean showPlanProcess) {
LOG.debug("Start analyze plan");
}
keepOrShowPlanProcess(showPlanProcess, () -> cascadesContext.newAnalyzer().analyze());
getHooks().forEach(hook -> hook.afterAnalyze(this));
this.statementContext.getPlannerHooks().forEach(hook -> hook.afterAnalyze(this));
NereidsTracer.logImportantTime("EndAnalyzePlan");
if (LOG.isDebugEnabled()) {
LOG.debug("End analyze plan");
Expand Down Expand Up @@ -640,14 +639,6 @@ public LogicalPlanAdapter getLogicalPlanAdapter() {
return logicalPlanAdapter;
}

public List<PlannerHook> getHooks() {
return hooks;
}

public void addHook(PlannerHook hook) {
this.hooks.add(hook);
}

private String getTimeMetricString(Function<SummaryProfile, String> profileSupplier) {
return getProfile(summaryProfile -> {
String metricString = profileSupplier.apply(summaryProfile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ public class StatementContext implements Closeable {

private FormatOptions formatOptions = FormatOptions.getDefault();

private List<PlannerHook> plannerHooks = new ArrayList<>();

public StatementContext() {
this(ConnectContext.get(), null, 0);
}
Expand Down Expand Up @@ -488,6 +490,14 @@ public FormatOptions getFormatOptions() {
return formatOptions;
}

public List<PlannerHook> getPlannerHooks() {
return plannerHooks;
}

public void addPlannerHook(PlannerHook plannerHook) {
this.plannerHooks.add(plannerHook);
}

private static class CloseableResource implements Closeable {
public final String resourceName;
public final String threadName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.doris.nereids.jobs.JobType;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.qe.ConnectContext;

import com.google.common.collect.ImmutableList;

Expand Down Expand Up @@ -102,10 +101,6 @@ private List<Rule> getJoinRules() {
}

private List<Rule> getMvRules() {
ConnectContext connectContext = context.getCascadesContext().getConnectContext();
if (connectContext.getSessionVariable().isEnableMaterializedViewRewrite()) {
return getRuleSet().getMaterializedViewRules();
}
return ImmutableList.of();
return getRuleSet().getMaterializedViewRules();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.jobs.rewrite.RewriteJob;
import org.apache.doris.nereids.rules.analysis.AddInitMaterializationHook;
import org.apache.doris.nereids.rules.analysis.AdjustAggregateNullableForEmptySet;
import org.apache.doris.nereids.rules.analysis.AnalyzeCTE;
import org.apache.doris.nereids.rules.analysis.BindExpression;
Expand Down Expand Up @@ -123,6 +124,7 @@ private static List<RewriteJob> buildAnalyzerJobs(Optional<CustomTableResolver>
bottomUp(new BindExpression()),
topDown(new BindSink()),
bottomUp(new CheckAfterBind()),
bottomUp(new AddInitMaterializationHook()),
bottomUp(
new ProjectToGlobalAggregate(),
// this rule check's the logicalProject node's isDistinct property
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public enum RuleType {
BINDING_INSERT_HIVE_TABLE(RuleTypeClass.REWRITE),
BINDING_INSERT_ICEBERG_TABLE(RuleTypeClass.REWRITE),
BINDING_INSERT_TARGET_TABLE(RuleTypeClass.REWRITE),
INIT_MATERIALIZATION_HOOK_FOR_FILE_SINK(RuleTypeClass.REWRITE),
INIT_MATERIALIZATION_HOOK_FOR_TABLE_SINK(RuleTypeClass.REWRITE),
BINDING_INSERT_FILE(RuleTypeClass.REWRITE),
BINDING_ONE_ROW_RELATION_SLOT(RuleTypeClass.REWRITE),
BINDING_RELATION(RuleTypeClass.REWRITE),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.rules.analysis;

import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.rules.exploration.mv.InitConsistentMaterializationContextHook;
import org.apache.doris.nereids.trees.plans.logical.LogicalTableSink;

import com.google.common.collect.ImmutableList;

import java.util.List;

/**
* Add init materialization hook for table sink and file sink
* */
public class AddInitMaterializationHook implements AnalysisRuleFactory {

@Override
public List<Rule> buildRules() {
return ImmutableList.of(
RuleType.INIT_MATERIALIZATION_HOOK_FOR_FILE_SINK.build(logicalFileSink()
.thenApply(ctx -> {
if (ctx.connectContext.getSessionVariable().isEnableDmlMaterializedViewRewrite()) {
ctx.statementContext.addPlannerHook(InitConsistentMaterializationContextHook.INSTANCE);
}
return ctx.root;
})),
RuleType.INIT_MATERIALIZATION_HOOK_FOR_TABLE_SINK.build(
any().when(LogicalTableSink.class::isInstance)
.thenApply(ctx -> {
if (ctx.connectContext.getSessionVariable().isEnableDmlMaterializedViewRewrite()) {
ctx.statementContext.addPlannerHook(InitConsistentMaterializationContextHook.INSTANCE);
}
return ctx.root;
}))
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ protected Plan rewriteQueryByView(MatchMode matchMode,
queryTopPlan,
materializationContext.getShuttledExprToScanExprMapping(),
viewToQuerySlotMapping,
true,
queryStructInfo.getTableBitSet());
boolean isRewrittenQueryExpressionValid = true;
if (!rewrittenQueryExpressions.isEmpty()) {
Expand Down Expand Up @@ -356,7 +355,7 @@ private boolean isGroupByEquals(Pair<Plan, LogicalAggregate<Plan>> queryTopPlanA
viewGroupExpressionQueryBased
);
}
if (queryGroupShuttledExpression.equals(viewShuttledExpressionQueryBasedToGroupByExpressionMap.values())) {
if (queryGroupShuttledExpression.equals(viewShuttledExpressionQueryBasedToGroupByExpressionMap.keySet())) {
// return true, if equals directly
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ protected Plan rewriteQueryByView(MatchMode matchMode,
queryStructInfo.getTopPlan(),
materializationContext.getShuttledExprToScanExprMapping(),
targetToSourceMapping,
true,
queryStructInfo.getTableBitSet()
);
// Can not rewrite, bail out
Expand Down
Loading

0 comments on commit 2b1aa05

Please sign in to comment.