Skip to content

Commit

Permalink
[enhance](mtmv)Improve the performance of obtaining partition/table v…
Browse files Browse the repository at this point in the history
…ersions (apache#39301)

Batch retrieve version information of all tables and partitions used by
MTMV and store it in MTMVRefreshContext
  • Loading branch information
zddr committed Aug 16, 2024
1 parent 824f035 commit b175d19
Show file tree
Hide file tree
Showing 11 changed files with 259 additions and 89 deletions.
13 changes: 9 additions & 4 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.Util;
import org.apache.doris.mtmv.MTMVRefreshContext;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.mtmv.MTMVSnapshotIf;
import org.apache.doris.mtmv.MTMVVersionSnapshot;
Expand Down Expand Up @@ -2805,14 +2806,18 @@ public List<Column> getPartitionColumns() {
}

@Override
public MTMVSnapshotIf getPartitionSnapshot(String partitionName) throws AnalysisException {
long visibleVersion = getPartitionOrAnalysisException(partitionName).getVisibleVersion();
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context)
throws AnalysisException {
Map<String, Long> partitionVersions = context.getBaseVersions().getPartitionVersions();
long visibleVersion = partitionVersions.containsKey(partitionName) ? partitionVersions.get(partitionName)
: getPartitionOrAnalysisException(partitionName).getVisibleVersion();
return new MTMVVersionSnapshot(visibleVersion);
}

@Override
public MTMVSnapshotIf getTableSnapshot() {
long visibleVersion = getVisibleVersion();
public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) {
Map<Long, Long> tableVersions = context.getBaseVersions().getTableVersions();
long visibleVersion = tableVersions.containsKey(id) ? tableVersions.get(id) : getVisibleVersion();
return new MTMVVersionSnapshot(visibleVersion);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.doris.datasource.hudi.HudiUtils;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.mtmv.MTMVMaxTimestampSnapshot;
import org.apache.doris.mtmv.MTMVRefreshContext;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.mtmv.MTMVSnapshotIf;
import org.apache.doris.mtmv.MTMVTimestampSnapshot;
Expand Down Expand Up @@ -763,13 +764,14 @@ private HiveMetaStoreCache.HivePartitionValues getHivePartitionValues() {
}

@Override
public MTMVSnapshotIf getPartitionSnapshot(String partitionName) throws AnalysisException {
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context)
throws AnalysisException {
long partitionLastModifyTime = getPartitionLastModifyTime(partitionName);
return new MTMVTimestampSnapshot(partitionLastModifyTime);
}

@Override
public MTMVSnapshotIf getTableSnapshot() throws AnalysisException {
public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) throws AnalysisException {
if (getPartitionType() == PartitionType.UNPARTITIONED) {
return new MTMVMaxTimestampSnapshot(getName(), getLastDdlTime());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
import org.apache.doris.mtmv.MTMVPartitionUtil;
import org.apache.doris.mtmv.MTMVPlanUtil;
import org.apache.doris.mtmv.MTMVRefreshContext;
import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshMethod;
import org.apache.doris.mtmv.MTMVRefreshPartitionSnapshot;
import org.apache.doris.mtmv.MTMVRelation;
Expand Down Expand Up @@ -177,8 +178,8 @@ public void run() throws JobException {
if (mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE) {
MTMVPartitionUtil.alignMvPartition(mtmv);
}
Map<String, Set<String>> partitionMappings = mtmv.calculatePartitionMappings();
this.needRefreshPartitions = calculateNeedRefreshPartitions(partitionMappings);
MTMVRefreshContext context = MTMVRefreshContext.buildContext(mtmv);
this.needRefreshPartitions = calculateNeedRefreshPartitions(context);
this.refreshMode = generateRefreshMode(needRefreshPartitions);
if (refreshMode == MTMVTaskRefreshMode.NOT_REFRESH) {
return;
Expand All @@ -196,8 +197,7 @@ public void run() throws JobException {
.subList(start, end > needRefreshPartitions.size() ? needRefreshPartitions.size() : end));
// need get names before exec
Map<String, MTMVRefreshPartitionSnapshot> execPartitionSnapshots = MTMVPartitionUtil
.generatePartitionSnapshots(mtmv, relation.getBaseTablesOneLevel(), execPartitionNames,
partitionMappings);
.generatePartitionSnapshots(context, relation.getBaseTablesOneLevel(), execPartitionNames);
exec(ctx, execPartitionNames, tableWithPartKey);
completedPartitions.addAll(execPartitionNames);
partitionSnapshots.putAll(execPartitionSnapshots);
Expand Down Expand Up @@ -432,7 +432,7 @@ private MTMVTaskRefreshMode generateRefreshMode(List<String> needRefreshPartitio
}
}

public List<String> calculateNeedRefreshPartitions(Map<String, Set<String>> partitionMappings)
public List<String> calculateNeedRefreshPartitions(MTMVRefreshContext context)
throws AnalysisException {
// check whether the user manually triggers it
if (taskContext.getTriggerMode() == MTMVTaskTriggerMode.MANUAL) {
Expand All @@ -450,9 +450,8 @@ public List<String> calculateNeedRefreshPartitions(Map<String, Set<String>> part
// check if data is fresh
// We need to use a newly generated relationship and cannot retrieve it using mtmv.getRelation()
// to avoid rebuilding the baseTable and causing a change in the tableId
boolean fresh = MTMVPartitionUtil.isMTMVSync(mtmv, relation.getBaseTablesOneLevel(),
mtmv.getExcludedTriggerTables(),
partitionMappings);
boolean fresh = MTMVPartitionUtil.isMTMVSync(context, relation.getBaseTablesOneLevel(),
mtmv.getExcludedTriggerTables());
if (fresh) {
return Lists.newArrayList();
}
Expand All @@ -462,8 +461,7 @@ public List<String> calculateNeedRefreshPartitions(Map<String, Set<String>> part
}
// We need to use a newly generated relationship and cannot retrieve it using mtmv.getRelation()
// to avoid rebuilding the baseTable and causing a change in the tableId
return MTMVPartitionUtil.getMTMVNeedRefreshPartitions(mtmv, relation.getBaseTablesOneLevel(),
partitionMappings);
return MTMVPartitionUtil.getMTMVNeedRefreshPartitions(context, relation.getBaseTablesOneLevel());
}

public MTMVTaskContext getTaskContext() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.mtmv;

import java.util.Map;

public class MTMVBaseVersions {
private final Map<Long, Long> tableVersions;
private final Map<String, Long> partitionVersions;

public MTMVBaseVersions(Map<Long, Long> tableVersions, Map<String, Long> partitionVersions) {
this.tableVersions = tableVersions;
this.partitionVersions = partitionVersions;
}

public Map<Long, Long> getTableVersions() {
return tableVersions;
}

public Map<String, Long> getPartitionVersions() {
return partitionVersions;
}
}
Loading

0 comments on commit b175d19

Please sign in to comment.