Skip to content

Commit

Permalink
Support auto analyze columns that haven't been analyzed for a long time.
Browse files Browse the repository at this point in the history
  • Loading branch information
Jibing-Li committed Oct 24, 2024
1 parent 2bec12e commit e5bf674
Show file tree
Hide file tree
Showing 12 changed files with 297 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2853,7 +2853,7 @@ public class Config extends ConfigBase {
"Columns that have not been collected within the specified interval will trigger automatic analyze. "
+ "0 means not trigger."
})
public static long auto_analyze_interval_seconds = 0;
public static long auto_analyze_interval_seconds = 86400;

//==========================================================================
// begin of cloud config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
* [TABLE]
* [
* WHERE
* [PRIORITY = ["HIGH"|"MID"|"LOW"]]
* [PRIORITY = ["HIGH"|"MID"|"LOW"|"LONG_TIME"]]
* ]
*/
public class ShowAutoAnalyzeJobsStmt extends ShowStmt implements NotFallbackInParser {
Expand Down Expand Up @@ -175,7 +175,7 @@ private void analyzeSubPredicate(Expr subExpr) throws AnalysisException {

if (!valid) {
throw new AnalysisException("Where clause should looks like: "
+ "PRIORITY = \"HIGH|MID|LOW\"");
+ "PRIORITY = \"HIGH|MID|LOW|LONG_TIME\"");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class ShowTableStatsStmt extends ShowStmt implements NotFallbackInParser
.add("new_partition")
.add("user_inject")
.add("enable_auto_analyze")
.add("last_analyze_time")
.build();

private static final ImmutableList<String> PARTITION_TITLE_NAMES =
Expand Down Expand Up @@ -229,6 +230,7 @@ public ShowResultSet constructTableResultSet(TableStatsMeta tableStatistic, Tabl
row.add("");
row.add("");
row.add("");
row.add("");
row.add(String.valueOf(table.autoAnalyzeEnabled()));
result.add(row);
return new ShowResultSet(getMetaData(), result);
Expand All @@ -242,13 +244,16 @@ public ShowResultSet constructTableResultSet(TableStatsMeta tableStatistic, Tabl
LocalDateTime dateTime =
LocalDateTime.ofInstant(Instant.ofEpochMilli(tableStatistic.updatedTime),
java.time.ZoneId.systemDefault());
String formattedDateTime = dateTime.format(formatter);
row.add(formattedDateTime);
LocalDateTime lastAnalyzeTime =
LocalDateTime.ofInstant(Instant.ofEpochMilli(tableStatistic.lastAnalyzeTime),
java.time.ZoneId.systemDefault());
row.add(dateTime.format(formatter));
row.add(tableStatistic.analyzeColumns().toString());
row.add(tableStatistic.jobType.toString());
row.add(String.valueOf(tableStatistic.partitionChanged.get()));
row.add(String.valueOf(tableStatistic.userInjected));
row.add(table == null ? "N/A" : String.valueOf(table.autoAnalyzeEnabled()));
row.add(lastAnalyzeTime.format(formatter));
result.add(row);
return new ShowResultSet(getMetaData(), result);
}
Expand Down
9 changes: 9 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.statistics.AnalysisManager;
import org.apache.doris.statistics.FollowerColumnSender;
import org.apache.doris.statistics.LongTimeJobAppender;
import org.apache.doris.statistics.StatisticsAutoCollector;
import org.apache.doris.statistics.StatisticsCache;
import org.apache.doris.statistics.StatisticsCleaner;
Expand Down Expand Up @@ -552,6 +553,8 @@ public class Env {

private StatisticsJobAppender statisticsJobAppender;

private LongTimeJobAppender longTimeJobAppender;

private FollowerColumnSender followerColumnSender;

private HiveTransactionMgr hiveTransactionMgr;
Expand Down Expand Up @@ -798,6 +801,7 @@ public Env(boolean isCheckpointCatalog) {
this.statisticsCleaner = new StatisticsCleaner();
this.statisticsAutoCollector = new StatisticsAutoCollector();
this.statisticsJobAppender = new StatisticsJobAppender();
this.longTimeJobAppender = new LongTimeJobAppender();
this.globalFunctionMgr = new GlobalFunctionMgr();
this.workloadGroupMgr = new WorkloadGroupMgr();
this.workloadSchedPolicyMgr = new WorkloadSchedPolicyMgr();
Expand Down Expand Up @@ -1849,6 +1853,7 @@ protected void startMasterOnlyDaemonThreads() {
statisticsCleaner.start();
statisticsAutoCollector.start();
statisticsJobAppender.start();
longTimeJobAppender.start();
}

// start threads that should run on all FE
Expand Down Expand Up @@ -6611,6 +6616,10 @@ public StatisticsJobAppender getStatisticsJobAppender() {
return statisticsJobAppender;
}

public LongTimeJobAppender getLongTimeJobAppender() {
return longTimeJobAppender;
}

public void alterMTMVRefreshInfo(AlterMTMVRefreshInfo info) {
AlterMTMV alter = new AlterMTMV(info.getMvName(), info.getRefreshInfo(), MTMVAlterOpType.ALTER_REFRESH_INFO);
this.alter.processAlterMTMV(alter, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
import java.util.StringJoiner;
import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
Expand All @@ -130,6 +131,9 @@ public class AnalysisManager implements Writable {
public final Map<TableName, Set<Pair<String, String>>> midPriorityJobs = new LinkedHashMap<>();
public final Map<TableName, Set<Pair<String, String>>> lowPriorityJobs = new LinkedHashMap<>();

public static final int LONG_TIME_JOB_QUEUE_LIMIT = 10;
private final BlockingQueue<TableIf> longTimeJobs = new ArrayBlockingQueue<>(LONG_TIME_JOB_QUEUE_LIMIT);

// Tracking running manually submitted async tasks, keep in mem only
protected final ConcurrentMap<Long, Map<Long, BaseAnalysisTask>> analysisJobIdToTaskMap = new ConcurrentHashMap<>();

Expand Down Expand Up @@ -381,7 +385,7 @@ public AnalysisInfo buildAnalysisJobInfo(AnalyzeTblStmt stmt) {
}
infoBuilder.setColName(stringJoiner.toString());
infoBuilder.setTaskIds(Lists.newArrayList());
infoBuilder.setTblUpdateTime(System.currentTimeMillis());
infoBuilder.setTblUpdateTime(table.getUpdateTime());
// Empty table row count is 0. Call fetchRowCount() when getRowCount() returns <= 0,
// because getRowCount may return <= 0 if cached is not loaded. This is mainly for external table.
long rowCount = StatisticsUtil.isEmptyTable(table, analysisMethod) ? 0 :
Expand Down Expand Up @@ -547,16 +551,36 @@ public List<AutoAnalysisPendingJob> showAutoPendingJobs(ShowAutoAnalyzeJobsStmt
result.addAll(getPendingJobs(highPriorityJobs, JobPriority.HIGH, tblName));
result.addAll(getPendingJobs(midPriorityJobs, JobPriority.MID, tblName));
result.addAll(getPendingJobs(lowPriorityJobs, JobPriority.LOW, tblName));
showLongTimeJobs(result);
} else if (priority.equals(JobPriority.HIGH.name())) {
result.addAll(getPendingJobs(highPriorityJobs, JobPriority.HIGH, tblName));
} else if (priority.equals(JobPriority.MID.name())) {
result.addAll(getPendingJobs(midPriorityJobs, JobPriority.MID, tblName));
} else if (priority.equals(JobPriority.LOW.name())) {
result.addAll(getPendingJobs(lowPriorityJobs, JobPriority.LOW, tblName));
} else if (priority.equals(JobPriority.LONG_TIME.name())) {
showLongTimeJobs(result);
}
return result;
}

protected void showLongTimeJobs(List<AutoAnalysisPendingJob> result) {
Object[] objects = longTimeJobs.toArray();
for (Object object : objects) {
TableIf table = (TableIf) object;
Set<Pair<String, String>> columnIndexPairs = table.getColumnIndexPairs(
table.getSchemaAllIndexes(false).stream()
.filter(c -> !StatisticsUtil.isUnsupportedType(c.getType()))
.map(Column::getName).collect(Collectors.toSet()))
.stream()
.filter(p -> !StatisticsUtil.needAnalyzeColumn(table, p))
.filter(p -> StatisticsUtil.columnNotAnalyzedForTooLong(table, p))
.collect(Collectors.toSet());
result.add(new AutoAnalysisPendingJob(table.getDatabase().getCatalog().getName(),
table.getDatabase().getFullName(), table.getName(), columnIndexPairs, JobPriority.LONG_TIME));
}
}

protected List<AutoAnalysisPendingJob> getPendingJobs(Map<TableName, Set<Pair<String, String>>> jobMap,
JobPriority priority, TableName tblName) {
List<AutoAnalysisPendingJob> result = Lists.newArrayList();
Expand Down Expand Up @@ -1482,4 +1506,33 @@ public void mergeFollowerQueryColumns(Collection<TQueryColumn> highColumns,
}
}
}

public void appendToLongTimeJobs(TableIf table) throws InterruptedException {
Object[] objects = longTimeJobs.toArray();
String catalogName = table.getDatabase().getCatalog().getName();
String dbName = table.getDatabase().getFullName();
String tableName = table.getName();
for (Object o : objects) {
TableIf queuedTbl = (TableIf) o;
try {
if (catalogName.equals(queuedTbl.getDatabase().getCatalog().getName())
&& dbName.equals(queuedTbl.getDatabase().getFullName())
&& tableName.equals(queuedTbl.getName())) {
return;
}
} catch (Exception e) {
LOG.warn("Failed to compare long time job for table {}, id {}", table.getName(), table.getId());
}
}
longTimeJobs.put(table);
}

public TableIf getLongTimeJob() {
try {
return longTimeJobs.poll(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ public enum JobPriority {
HIGH,
MID,
LOW,
LONG_TIME,
MANUAL;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.statistics;

import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.statistics.util.StatisticsUtil;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class LongTimeJobAppender extends MasterDaemon {
private static final Logger LOG = LogManager.getLogger(LongTimeJobAppender.class);

public LongTimeJobAppender() {
super("LongTimeJobAppender", TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes));
}

@Override
protected void runAfterCatalogReady() {
if (!Env.getCurrentEnv().isMaster()) {
return;
}
if (!StatisticsUtil.statsTblAvailable()) {
LOG.info("Stats table not available, skip");
return;
}
if (Env.getCurrentEnv().getStatisticsAutoCollector() == null
|| !Env.getCurrentEnv().getStatisticsAutoCollector().isReady()) {
LOG.info("Statistics auto collector not ready, skip");
return;
}
if (Env.isCheckpointThread()) {
return;
}
if (!StatisticsUtil.canCollect()) {
LOG.debug("Auto analyze not enabled or not in analyze time range.");
return;
}
traverseAllTables();
}

protected void traverseAllTables() {
List<CatalogIf> catalogs = getCatalogsInOrder();
AnalysisManager analysisManager = Env.getServingEnv().getAnalysisManager();
for (CatalogIf ctl : catalogs) {
if (!StatisticsUtil.canCollect()) {
break;
}
if (!ctl.enableAutoAnalyze()) {
continue;
}
List<DatabaseIf> dbs = getDatabasesInOrder(ctl);
for (DatabaseIf<TableIf> db : dbs) {
if (!StatisticsUtil.canCollect()) {
break;
}
if (StatisticConstants.SYSTEM_DBS.contains(db.getFullName())) {
continue;
}
for (TableIf table : getTablesInOrder(db)) {
try {
if (skip(table)) {
continue;
}
TableStatsMeta tblStats = analysisManager.findTableStatsStatus(table.getId());
if (tblStats == null) {
continue;
}
Set<Pair<String, String>> columns = tblStats.analyzeColumns();
if (columns == null || columns.isEmpty()) {
continue;
}
for (Pair<String, String> columnPair : columns) {
if (StatisticsUtil.columnNotAnalyzedForTooLong(table, columnPair)) {
analysisManager.appendToLongTimeJobs(table);
break;
}
}
} catch (Throwable t) {
LOG.warn("Failed to analyze table {}.{}.{}",
db.getCatalog().getName(), db.getFullName(), table.getName(), t);
}
}
}
}
}

public List<CatalogIf> getCatalogsInOrder() {
return Env.getCurrentEnv().getCatalogMgr().getCopyOfCatalog().stream()
.sorted((c1, c2) -> (int) (c1.getId() - c2.getId())).collect(Collectors.toList());
}

public List<DatabaseIf<? extends TableIf>> getDatabasesInOrder(CatalogIf<DatabaseIf> catalog) {
return catalog.getAllDbs().stream()
.sorted((d1, d2) -> (int) (d1.getId() - d2.getId())).collect(Collectors.toList());
}

public List<TableIf> getTablesInOrder(DatabaseIf<? extends TableIf> db) {
return db.getTables().stream()
.sorted((t1, t2) -> (int) (t1.getId() - t2.getId())).collect(Collectors.toList());
}

// return true if skip auto analyze this time.
protected boolean skip(TableIf table) {
if (!(table instanceof OlapTable || table instanceof HMSExternalTable)) {
return true;
}
// For now, only support Hive HMS table auto collection.
if (table instanceof HMSExternalTable
&& !((HMSExternalTable) table).getDlaType().equals(HMSExternalTable.DLAType.HIVE)) {
return true;
}
// Skip wide table.
return table.getBaseSchema().size() > StatisticsUtil.getAutoAnalyzeTableWidthThreshold();
}
}
Loading

0 comments on commit e5bf674

Please sign in to comment.