diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index d7ac361d577efd..b20433fdf68024 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -50,6 +50,7 @@ import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.mtmv.MTMVRefreshContext; import org.apache.doris.mtmv.MTMVRelatedTableIf; import org.apache.doris.mtmv.MTMVSnapshotIf; import org.apache.doris.mtmv.MTMVVersionSnapshot; @@ -2992,14 +2993,18 @@ public List getPartitionColumns() { } @Override - public MTMVSnapshotIf getPartitionSnapshot(String partitionName) throws AnalysisException { - long visibleVersion = getPartitionOrAnalysisException(partitionName).getVisibleVersion(); + public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context) + throws AnalysisException { + Map partitionVersions = context.getBaseVersions().getPartitionVersions(); + long visibleVersion = partitionVersions.containsKey(partitionName) ? partitionVersions.get(partitionName) + : getPartitionOrAnalysisException(partitionName).getVisibleVersion(); return new MTMVVersionSnapshot(visibleVersion); } @Override - public MTMVSnapshotIf getTableSnapshot() { - long visibleVersion = getVisibleVersion(); + public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) { + Map tableVersions = context.getBaseVersions().getTableVersions(); + long visibleVersion = tableVersions.containsKey(id) ? tableVersions.get(id) : getVisibleVersion(); return new MTMVVersionSnapshot(visibleVersion); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index e1afbf3dc0bdd9..a9f2da13b4093b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -32,6 +32,7 @@ import org.apache.doris.datasource.hudi.HudiUtils; import org.apache.doris.datasource.iceberg.IcebergUtils; import org.apache.doris.mtmv.MTMVMaxTimestampSnapshot; +import org.apache.doris.mtmv.MTMVRefreshContext; import org.apache.doris.mtmv.MTMVRelatedTableIf; import org.apache.doris.mtmv.MTMVSnapshotIf; import org.apache.doris.mtmv.MTMVTimestampSnapshot; @@ -748,13 +749,14 @@ private HiveMetaStoreCache.HivePartitionValues getHivePartitionValues() { } @Override - public MTMVSnapshotIf getPartitionSnapshot(String partitionName) throws AnalysisException { + public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context) + throws AnalysisException { long partitionLastModifyTime = getPartitionLastModifyTime(partitionName); return new MTMVTimestampSnapshot(partitionLastModifyTime); } @Override - public MTMVSnapshotIf getTableSnapshot() throws AnalysisException { + public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) throws AnalysisException { if (getPartitionType() == PartitionType.UNPARTITIONED) { return new MTMVMaxTimestampSnapshot(getName(), getLastDdlTime()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index 5c48649bf36b0a..d53994fc441caf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -36,6 +36,7 @@ import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType; import org.apache.doris.mtmv.MTMVPartitionUtil; import org.apache.doris.mtmv.MTMVPlanUtil; +import org.apache.doris.mtmv.MTMVRefreshContext; import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshMethod; import org.apache.doris.mtmv.MTMVRefreshPartitionSnapshot; import org.apache.doris.mtmv.MTMVRelation; @@ -176,8 +177,8 @@ public void run() throws JobException { if (mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE) { MTMVPartitionUtil.alignMvPartition(mtmv); } - Map> partitionMappings = mtmv.calculatePartitionMappings(); - this.needRefreshPartitions = calculateNeedRefreshPartitions(partitionMappings); + MTMVRefreshContext context = MTMVRefreshContext.buildContext(mtmv); + this.needRefreshPartitions = calculateNeedRefreshPartitions(context); this.refreshMode = generateRefreshMode(needRefreshPartitions); if (refreshMode == MTMVTaskRefreshMode.NOT_REFRESH) { return; @@ -195,8 +196,7 @@ public void run() throws JobException { .subList(start, end > needRefreshPartitions.size() ? needRefreshPartitions.size() : end)); // need get names before exec Map execPartitionSnapshots = MTMVPartitionUtil - .generatePartitionSnapshots(mtmv, relation.getBaseTablesOneLevel(), execPartitionNames, - partitionMappings); + .generatePartitionSnapshots(context, relation.getBaseTablesOneLevel(), execPartitionNames); exec(ctx, execPartitionNames, tableWithPartKey); completedPartitions.addAll(execPartitionNames); partitionSnapshots.putAll(execPartitionSnapshots); @@ -424,7 +424,7 @@ private MTMVTaskRefreshMode generateRefreshMode(List needRefreshPartitio } } - public List calculateNeedRefreshPartitions(Map> partitionMappings) + public List calculateNeedRefreshPartitions(MTMVRefreshContext context) throws AnalysisException { // check whether the user manually triggers it if (taskContext.getTriggerMode() == MTMVTaskTriggerMode.MANUAL) { @@ -442,9 +442,8 @@ public List calculateNeedRefreshPartitions(Map> part // check if data is fresh // We need to use a newly generated relationship and cannot retrieve it using mtmv.getRelation() // to avoid rebuilding the baseTable and causing a change in the tableId - boolean fresh = MTMVPartitionUtil.isMTMVSync(mtmv, relation.getBaseTablesOneLevel(), - mtmv.getExcludedTriggerTables(), - partitionMappings); + boolean fresh = MTMVPartitionUtil.isMTMVSync(context, relation.getBaseTablesOneLevel(), + mtmv.getExcludedTriggerTables()); if (fresh) { return Lists.newArrayList(); } @@ -454,8 +453,7 @@ public List calculateNeedRefreshPartitions(Map> part } // We need to use a newly generated relationship and cannot retrieve it using mtmv.getRelation() // to avoid rebuilding the baseTable and causing a change in the tableId - return MTMVPartitionUtil.getMTMVNeedRefreshPartitions(mtmv, relation.getBaseTablesOneLevel(), - partitionMappings); + return MTMVPartitionUtil.getMTMVNeedRefreshPartitions(context, relation.getBaseTablesOneLevel()); } public MTMVTaskContext getTaskContext() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVBaseVersions.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVBaseVersions.java new file mode 100644 index 00000000000000..7f83389a953ccf --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVBaseVersions.java @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +import java.util.Map; + +public class MTMVBaseVersions { + private final Map tableVersions; + private final Map partitionVersions; + + public MTMVBaseVersions(Map tableVersions, Map partitionVersions) { + this.tableVersions = tableVersions; + this.partitionVersions = partitionVersions; + } + + public Map getTableVersions() { + return tableVersions; + } + + public Map getPartitionVersions() { + return partitionVersions; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java index b07ca6ad1d10bf..e6a89007310d6e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java @@ -26,13 +26,16 @@ import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MTMV; +import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType; +import org.apache.doris.rpc.RpcException; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -71,17 +74,18 @@ public class MTMVPartitionUtil { /** * Determine whether the partition is sync with retated partition and other baseTables * - * @param mtmv + * @param refreshContext * @param partitionName - * @param relatedPartitionNames * @param tables * @param excludedTriggerTables * @return * @throws AnalysisException */ - public static boolean isMTMVPartitionSync(MTMV mtmv, String partitionName, Set relatedPartitionNames, + public static boolean isMTMVPartitionSync(MTMVRefreshContext refreshContext, String partitionName, Set tables, Set excludedTriggerTables) throws AnalysisException { + MTMV mtmv = refreshContext.getMtmv(); + Set relatedPartitionNames = refreshContext.getPartitionMappings().get(partitionName); boolean isSyncWithPartition = true; if (mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE) { MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable(); @@ -92,9 +96,10 @@ public static boolean isMTMVPartitionSync(MTMV mtmv, String partitionName, Set tables, Set excludeTables, - Map> partitionMappings) + public static boolean isMTMVSync(MTMVRefreshContext context, Set tables, Set excludeTables) throws AnalysisException { + MTMV mtmv = context.getMtmv(); Set partitionNames = mtmv.getPartitionNames(); for (String partitionName : partitionNames) { - if (!isMTMVPartitionSync(mtmv, partitionName, partitionMappings.get(partitionName), tables, + if (!isMTMVPartitionSync(context, partitionName, tables, excludeTables)) { return false; } @@ -234,17 +238,18 @@ public static boolean isMTMVSync(MTMV mtmv, Set tables, Set> getPartitionsUnSyncTables(MTMV mtmv, List partitionIds) throws AnalysisException { Map> res = Maps.newHashMap(); - Map> partitionMappings = mtmv.calculatePartitionMappings(); + MTMVRefreshContext context = MTMVRefreshContext.buildContext(mtmv); for (Long partitionId : partitionIds) { String partitionName = mtmv.getPartitionOrAnalysisException(partitionId).getName(); - res.put(partitionId, getPartitionUnSyncTables(mtmv, partitionName, partitionMappings.get(partitionName))); + res.put(partitionId, getPartitionUnSyncTables(context, partitionName)); } return res; } - private static List getPartitionUnSyncTables(MTMV mtmv, String partitionName, - Set relatedPartitionNames) + private static List getPartitionUnSyncTables(MTMVRefreshContext context, String partitionName) throws AnalysisException { + MTMV mtmv = context.getMtmv(); + Set relatedPartitionNames = context.getPartitionMappings().get(partitionName); List res = Lists.newArrayList(); for (BaseTableInfo baseTableInfo : mtmv.getRelation().getBaseTablesOneLevel()) { TableIf table = MTMVUtil.getTable(baseTableInfo); @@ -262,13 +267,13 @@ private static List getPartitionUnSyncTables(MTMV mtmv, String partition res.add(mtmvRelatedTableIf.getName()); continue; } - boolean isSyncWithPartition = isSyncWithPartitions(mtmv, partitionName, mtmvRelatedTableIf, + boolean isSyncWithPartition = isSyncWithPartitions(context, partitionName, relatedPartitionNames); if (!isSyncWithPartition) { res.add(mtmvRelatedTableIf.getName()); } } else { - if (!isSyncWithBaseTable(mtmv, partitionName, baseTableInfo)) { + if (!isSyncWithBaseTable(context, partitionName, baseTableInfo)) { res.add(table.getName()); } } @@ -279,17 +284,17 @@ private static List getPartitionUnSyncTables(MTMV mtmv, String partition /** * Get the partitions that need to be refreshed * - * @param mtmv + * @param context * @param baseTables * @return */ - public static List getMTMVNeedRefreshPartitions(MTMV mtmv, Set baseTables, - Map> partitionMappings) { + public static List getMTMVNeedRefreshPartitions(MTMVRefreshContext context, Set baseTables) { + MTMV mtmv = context.getMtmv(); Set partitionNames = mtmv.getPartitionNames(); List res = Lists.newArrayList(); for (String partitionName : partitionNames) { try { - if (!isMTMVPartitionSync(mtmv, partitionName, partitionMappings.get(partitionName), baseTables, + if (!isMTMVPartitionSync(context, partitionName, baseTables, mtmv.getExcludedTriggerTables())) { res.add(partitionName); } @@ -304,16 +309,16 @@ public static List getMTMVNeedRefreshPartitions(MTMV mtmv, Set relatedPartitionNames) throws AnalysisException { + MTMV mtmv = context.getMtmv(); + MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable(); if (!relatedTable.needAutoRefresh()) { return true; } @@ -324,7 +329,7 @@ public static boolean isSyncWithPartitions(MTMV mtmv, String mtmvPartitionName, } for (String relatedPartitionName : relatedPartitionNames) { MTMVSnapshotIf relatedPartitionCurrentSnapshot = relatedTable - .getPartitionSnapshot(relatedPartitionName); + .getPartitionSnapshot(relatedPartitionName, context); if (!mtmv.getRefreshSnapshot() .equalsWithRelatedPartition(mtmvPartitionName, relatedPartitionName, relatedPartitionCurrentSnapshot)) { @@ -398,7 +403,8 @@ private static void addPartition(MTMV mtmv, PartitionKeyDesc oldPartitionKeyDesc * @param excludedTriggerTables * @return */ - private static boolean isSyncWithAllBaseTables(MTMV mtmv, String mtmvPartitionName, Set tables, + private static boolean isSyncWithAllBaseTables(MTMVRefreshContext context, String mtmvPartitionName, + Set tables, Set excludedTriggerTables) throws AnalysisException { for (BaseTableInfo baseTableInfo : tables) { TableIf table = null; @@ -411,7 +417,7 @@ private static boolean isSyncWithAllBaseTables(MTMV mtmv, String mtmvPartitionNa if (excludedTriggerTables.contains(table.getName())) { continue; } - boolean syncWithBaseTable = isSyncWithBaseTable(mtmv, mtmvPartitionName, baseTableInfo); + boolean syncWithBaseTable = isSyncWithBaseTable(context, mtmvPartitionName, baseTableInfo); if (!syncWithBaseTable) { return false; } @@ -419,8 +425,10 @@ private static boolean isSyncWithAllBaseTables(MTMV mtmv, String mtmvPartitionNa return true; } - private static boolean isSyncWithBaseTable(MTMV mtmv, String mtmvPartitionName, BaseTableInfo baseTableInfo) + private static boolean isSyncWithBaseTable(MTMVRefreshContext context, String mtmvPartitionName, + BaseTableInfo baseTableInfo) throws AnalysisException { + MTMV mtmv = context.getMtmv(); TableIf table = null; try { table = MTMVUtil.getTable(baseTableInfo); @@ -438,7 +446,7 @@ private static boolean isSyncWithBaseTable(MTMV mtmv, String mtmvPartitionName, if (!baseTable.needAutoRefresh()) { return true; } - MTMVSnapshotIf baseTableCurrentSnapshot = baseTable.getTableSnapshot(); + MTMVSnapshotIf baseTableCurrentSnapshot = baseTable.getTableSnapshot(context); return mtmv.getRefreshSnapshot() .equalsWithBaseTable(mtmvPartitionName, baseTable.getId(), baseTableCurrentSnapshot); } @@ -446,35 +454,35 @@ private static boolean isSyncWithBaseTable(MTMV mtmv, String mtmvPartitionName, /** * Generate updated snapshots of partitions to determine if they are synchronized * - * @param mtmv + * @param context * @param baseTables * @param partitionNames - * @param partitionMappings * @return * @throws AnalysisException */ - public static Map generatePartitionSnapshots(MTMV mtmv, - Set baseTables, Set partitionNames, - Map> partitionMappings) + public static Map generatePartitionSnapshots(MTMVRefreshContext context, + Set baseTables, Set partitionNames) throws AnalysisException { Map res = Maps.newHashMap(); for (String partitionName : partitionNames) { res.put(partitionName, - generatePartitionSnapshot(mtmv, baseTables, partitionMappings.get(partitionName))); + generatePartitionSnapshot(context, baseTables, + context.getPartitionMappings().get(partitionName))); } return res; } - private static MTMVRefreshPartitionSnapshot generatePartitionSnapshot(MTMV mtmv, + private static MTMVRefreshPartitionSnapshot generatePartitionSnapshot(MTMVRefreshContext context, Set baseTables, Set relatedPartitionNames) throws AnalysisException { + MTMV mtmv = context.getMtmv(); MTMVRefreshPartitionSnapshot refreshPartitionSnapshot = new MTMVRefreshPartitionSnapshot(); if (mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE) { MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable(); for (String relatedPartitionName : relatedPartitionNames) { MTMVSnapshotIf partitionSnapshot = relatedTable - .getPartitionSnapshot(relatedPartitionName); + .getPartitionSnapshot(relatedPartitionName, context); refreshPartitionSnapshot.getPartitions() .put(relatedPartitionName, partitionSnapshot); } @@ -488,7 +496,8 @@ private static MTMVRefreshPartitionSnapshot generatePartitionSnapshot(MTMV mtmv, if (!(table instanceof MTMVRelatedTableIf)) { continue; } - refreshPartitionSnapshot.getTables().put(table.getId(), ((MTMVRelatedTableIf) table).getTableSnapshot()); + refreshPartitionSnapshot.getTables() + .put(table.getId(), ((MTMVRelatedTableIf) table).getTableSnapshot(context)); } return refreshPartitionSnapshot; } @@ -502,4 +511,57 @@ public static Type getPartitionColumnType(MTMVRelatedTableIf relatedTable, Strin } throw new AnalysisException("can not getPartitionColumnType by:" + col); } + + public static MTMVBaseVersions getBaseVersions(MTMV mtmv) throws AnalysisException { + return new MTMVBaseVersions(getTableVersions(mtmv), getPartitionVersions(mtmv)); + } + + private static Map getPartitionVersions(MTMV mtmv) throws AnalysisException { + Map res = Maps.newHashMap(); + if (mtmv.getMvPartitionInfo().getPartitionType().equals(MTMVPartitionType.SELF_MANAGE)) { + return res; + } + MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable(); + if (!(relatedTable instanceof OlapTable)) { + return res; + } + List partitions = Lists.newArrayList(((OlapTable) relatedTable).getPartitions()); + List versions = null; + try { + versions = Partition.getVisibleVersions(partitions); + } catch (RpcException e) { + throw new AnalysisException("getVisibleVersions failed.", e); + } + Preconditions.checkState(partitions.size() == versions.size()); + for (int i = 0; i < partitions.size(); i++) { + res.put(partitions.get(i).getName(), versions.get(i)); + } + return res; + } + + private static Map getTableVersions(MTMV mtmv) { + Map res = Maps.newHashMap(); + if (mtmv.getRelation() == null || mtmv.getRelation().getBaseTablesOneLevel() == null) { + return res; + } + List olapTables = Lists.newArrayList(); + for (BaseTableInfo baseTableInfo : mtmv.getRelation().getBaseTablesOneLevel()) { + TableIf table = null; + try { + table = MTMVUtil.getTable(baseTableInfo); + } catch (AnalysisException e) { + LOG.info(e); + continue; + } + if (table instanceof OlapTable) { + olapTables.add((OlapTable) table); + } + } + List versions = OlapTable.getVisibleVersionInBatch(olapTables); + Preconditions.checkState(olapTables.size() == versions.size()); + for (int i = 0; i < olapTables.size(); i++) { + res.put(olapTables.get(i).getId(), versions.get(i)); + } + return res; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshContext.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshContext.java new file mode 100644 index 00000000000000..3d611b5e8527e6 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshContext.java @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +import org.apache.doris.catalog.MTMV; +import org.apache.doris.common.AnalysisException; + +import java.util.Map; +import java.util.Set; + +public class MTMVRefreshContext { + private MTMV mtmv; + private Map> partitionMappings; + private MTMVBaseVersions baseVersions; + + public MTMVRefreshContext(MTMV mtmv) { + this.mtmv = mtmv; + } + + public MTMV getMtmv() { + return mtmv; + } + + public Map> getPartitionMappings() { + return partitionMappings; + } + + public MTMVBaseVersions getBaseVersions() { + return baseVersions; + } + + public static MTMVRefreshContext buildContext(MTMV mtmv) throws AnalysisException { + MTMVRefreshContext context = new MTMVRefreshContext(mtmv); + context.partitionMappings = mtmv.calculatePartitionMappings(); + context.baseVersions = MTMVPartitionUtil.getBaseVersions(mtmv); + return context; + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java index c34df750de5bdc..516eb904e58463 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java @@ -69,7 +69,7 @@ public interface MTMVRelatedTableIf extends TableIf { * @return partition snapshot at current time * @throws AnalysisException */ - MTMVSnapshotIf getPartitionSnapshot(String partitionName) throws AnalysisException; + MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context) throws AnalysisException; /** * getTableSnapshot @@ -77,7 +77,7 @@ public interface MTMVRelatedTableIf extends TableIf { * @return table snapshot at current time * @throws AnalysisException */ - MTMVSnapshotIf getTableSnapshot() throws AnalysisException; + MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) throws AnalysisException; /** * Does the current type of table allow timed triggering diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRewriteUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRewriteUtil.java index 5392313ba62a19..5e17673a06883f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRewriteUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRewriteUtil.java @@ -31,8 +31,6 @@ import java.util.Collection; import java.util.List; -import java.util.Map; -import java.util.Set; public class MTMVRewriteUtil { private static final Logger LOG = LogManager.getLogger(MTMVRewriteUtil.class); @@ -57,7 +55,7 @@ public static Collection getMTMVCanRewritePartitions(MTMV mtmv, Conne || mtmv.getStatus().getRefreshState() == MTMVRefreshState.INIT) { return res; } - Map> partitionMappings = null; + MTMVRefreshContext refreshContext = null; // check gracePeriod long gracePeriodMills = mtmv.getGracePeriod(); for (Partition partition : allPartitions) { @@ -67,11 +65,11 @@ public static Collection getMTMVCanRewritePartitions(MTMV mtmv, Conne continue; } try { - if (partitionMappings == null) { - partitionMappings = mtmv.calculatePartitionMappings(); + if (refreshContext == null) { + refreshContext = MTMVRefreshContext.buildContext(mtmv); } - if (MTMVPartitionUtil.isMTMVPartitionSync(mtmv, partition.getName(), - partitionMappings.get(partition.getName()), mtmvRelation.getBaseTablesOneLevel(), + if (MTMVPartitionUtil.isMTMVPartitionSync(refreshContext, partition.getName(), + mtmvRelation.getBaseTablesOneLevel(), Sets.newHashSet())) { res.add(partition); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java index d6c4a87f224ca4..63a75c724988ed 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java @@ -26,6 +26,7 @@ import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import mockit.Expectations; import mockit.Mocked; @@ -55,6 +56,10 @@ public class MTMVPartitionUtilTest { private MTMVRefreshSnapshot refreshSnapshot; @Mocked private MTMVUtil mtmvUtil; + @Mocked + private MTMVRefreshContext context; + @Mocked + private MTMVBaseVersions versions; private Set baseTables = Sets.newHashSet(); @@ -67,6 +72,18 @@ public void setUp() throws NoSuchMethodException, SecurityException, AnalysisExc minTimes = 0; result = relation; + context.getMtmv(); + minTimes = 0; + result = mtmv; + + context.getPartitionMappings(); + minTimes = 0; + result = Maps.newHashMap(); + + context.getBaseVersions(); + minTimes = 0; + result = versions; + mtmv.getPartitions(); minTimes = 0; result = Lists.newArrayList(p1); @@ -95,7 +112,7 @@ public void setUp() throws NoSuchMethodException, SecurityException, AnalysisExc minTimes = 0; result = true; - baseOlapTable.getTableSnapshot(); + baseOlapTable.getTableSnapshot((MTMVRefreshContext) any); minTimes = 0; result = baseSnapshotIf; @@ -115,7 +132,7 @@ public void setUp() throws NoSuchMethodException, SecurityException, AnalysisExc minTimes = 0; result = true; - baseOlapTable.getPartitionSnapshot(anyString); + baseOlapTable.getPartitionSnapshot(anyString, (MTMVRefreshContext) any); minTimes = 0; result = baseSnapshotIf; @@ -152,7 +169,7 @@ public void testIsMTMVSyncNotSync() { @Test public void testIsSyncWithPartition() throws AnalysisException { boolean isSyncWithPartition = MTMVPartitionUtil - .isSyncWithPartitions(mtmv, "name1", baseOlapTable, Sets.newHashSet("name2")); + .isSyncWithPartitions(context, "name1", Sets.newHashSet("name2")); Assert.assertTrue(isSyncWithPartition); } @@ -166,7 +183,7 @@ public void testIsSyncWithPartitionNotEqual() throws AnalysisException { } }; boolean isSyncWithPartition = MTMVPartitionUtil - .isSyncWithPartitions(mtmv, "name1", baseOlapTable, Sets.newHashSet("name2")); + .isSyncWithPartitions(context, "name1", Sets.newHashSet("name2")); Assert.assertFalse(isSyncWithPartition); } @@ -180,7 +197,7 @@ public void testIsSyncWithPartitionNotSync() throws AnalysisException { } }; boolean isSyncWithPartition = MTMVPartitionUtil - .isSyncWithPartitions(mtmv, "name1", baseOlapTable, Sets.newHashSet("name2")); + .isSyncWithPartitions(context, "name1", Sets.newHashSet("name2")); Assert.assertFalse(isSyncWithPartition); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRewriteUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRewriteUtilTest.java index 2b8c16509af5c9..c2e402adb82ab8 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRewriteUtilTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRewriteUtilTest.java @@ -103,7 +103,7 @@ public void setUp() throws NoSuchMethodException, SecurityException, AnalysisExc minTimes = 0; result = true; - MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyString, (Set) any, + MTMVPartitionUtil.isMTMVPartitionSync((MTMVRefreshContext) any, anyString, (Set) any, (Set) any); minTimes = 0; @@ -124,7 +124,7 @@ public void testGetMTMVCanRewritePartitionsForceConsistent() throws AnalysisExce minTimes = 0; result = 2L; - MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyString, (Set) any, + MTMVPartitionUtil.isMTMVPartitionSync((MTMVRefreshContext) any, anyString, (Set) any, (Set) any); minTimes = 0; @@ -154,7 +154,7 @@ public void testGetMTMVCanRewritePartitionsInGracePeriod() throws AnalysisExcept minTimes = 0; result = 2L; - MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyString, (Set) any, + MTMVPartitionUtil.isMTMVPartitionSync((MTMVRefreshContext) any, anyString, (Set) any, (Set) any); minTimes = 0; @@ -175,7 +175,7 @@ public void testGetMTMVCanRewritePartitionsNotInGracePeriod() throws AnalysisExc minTimes = 0; result = 1L; - MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyString, (Set) any, + MTMVPartitionUtil.isMTMVPartitionSync((MTMVRefreshContext) any, anyString, (Set) any, (Set) any); minTimes = 0; @@ -208,7 +208,7 @@ public void testGetMTMVCanRewritePartitionsDisableMaterializedViewRewrite() { public void testGetMTMVCanRewritePartitionsNotSync() throws AnalysisException { new Expectations() { { - MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyString, (Set) any, + MTMVPartitionUtil.isMTMVPartitionSync((MTMVRefreshContext) any, anyString, (Set) any, (Set) any); minTimes = 0; diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskTest.java index 512bd6099f025b..0bcd2f0569097d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskTest.java @@ -28,7 +28,6 @@ import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshMethod; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.google.common.collect.Sets; import mockit.Expectations; import mockit.Mocked; @@ -38,7 +37,6 @@ import org.junit.Test; import java.util.List; -import java.util.Map; import java.util.Set; public class MTMVTaskTest { @@ -84,8 +82,7 @@ public void setUp() // minTimes = 0; // result = poneId; - mtmvPartitionUtil.isMTMVSync(mtmv, (Set) any, (Set) any, - (Map>) any); + mtmvPartitionUtil.isMTMVSync((MTMVRefreshContext) any, (Set) any, (Set) any); minTimes = 0; result = true; @@ -104,7 +101,7 @@ public void setUp() public void testCalculateNeedRefreshPartitionsManualComplete() throws AnalysisException { MTMVTaskContext context = new MTMVTaskContext(MTMVTaskTriggerMode.MANUAL, null, true); MTMVTask task = new MTMVTask(mtmv, relation, context); - List result = task.calculateNeedRefreshPartitions(Maps.newHashMap()); + List result = task.calculateNeedRefreshPartitions(null); Assert.assertEquals(allPartitionNames, result); } @@ -112,7 +109,7 @@ public void testCalculateNeedRefreshPartitionsManualComplete() throws AnalysisEx public void testCalculateNeedRefreshPartitionsManualPartitions() throws AnalysisException { MTMVTaskContext context = new MTMVTaskContext(MTMVTaskTriggerMode.MANUAL, Lists.newArrayList(poneName), false); MTMVTask task = new MTMVTask(mtmv, relation, context); - List result = task.calculateNeedRefreshPartitions(Maps.newHashMap()); + List result = task.calculateNeedRefreshPartitions(null); Assert.assertEquals(Lists.newArrayList(poneName), result); } @@ -127,7 +124,7 @@ public void testCalculateNeedRefreshPartitionsSystem() throws AnalysisException }; MTMVTaskContext context = new MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM); MTMVTask task = new MTMVTask(mtmv, relation, context); - List result = task.calculateNeedRefreshPartitions(Maps.newHashMap()); + List result = task.calculateNeedRefreshPartitions(null); Assert.assertTrue(CollectionUtils.isEmpty(result)); } @@ -135,7 +132,7 @@ public void testCalculateNeedRefreshPartitionsSystem() throws AnalysisException public void testCalculateNeedRefreshPartitionsSystemComplete() throws AnalysisException { MTMVTaskContext context = new MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM); MTMVTask task = new MTMVTask(mtmv, relation, context); - List result = task.calculateNeedRefreshPartitions(Maps.newHashMap()); + List result = task.calculateNeedRefreshPartitions(null); Assert.assertEquals(allPartitionNames, result); } @@ -143,15 +140,14 @@ public void testCalculateNeedRefreshPartitionsSystemComplete() throws AnalysisEx public void testCalculateNeedRefreshPartitionsSystemNotSyncComplete() throws AnalysisException { new Expectations() { { - mtmvPartitionUtil.isMTMVSync(mtmv, (Set) any, (Set) any, - (Map>) any); + mtmvPartitionUtil.isMTMVSync((MTMVRefreshContext) any, (Set) any, (Set) any); minTimes = 0; result = false; } }; MTMVTaskContext context = new MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM); MTMVTask task = new MTMVTask(mtmv, relation, context); - List result = task.calculateNeedRefreshPartitions(Maps.newHashMap()); + List result = task.calculateNeedRefreshPartitions(null); Assert.assertEquals(allPartitionNames, result); } @@ -160,7 +156,7 @@ public void testCalculateNeedRefreshPartitionsSystemNotSyncAuto() throws Analysi new Expectations() { { mtmvPartitionUtil - .isMTMVSync(mtmv, (Set) any, (Set) any, (Map>) any); + .isMTMVSync((MTMVRefreshContext) any, (Set) any, (Set) any); minTimes = 0; result = false; @@ -169,14 +165,14 @@ public void testCalculateNeedRefreshPartitionsSystemNotSyncAuto() throws Analysi result = RefreshMethod.AUTO; mtmvPartitionUtil - .getMTMVNeedRefreshPartitions(mtmv, (Set) any, (Map>) any); + .getMTMVNeedRefreshPartitions((MTMVRefreshContext) any, (Set) any); minTimes = 0; result = Lists.newArrayList(ptwoName); } }; MTMVTaskContext context = new MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM); MTMVTask task = new MTMVTask(mtmv, relation, context); - List result = task.calculateNeedRefreshPartitions(Maps.newHashMap()); + List result = task.calculateNeedRefreshPartitions(null); Assert.assertEquals(Lists.newArrayList(ptwoName), result); } }