Skip to content

Commit

Permalink
[enhancement](nereids) speedup sql cache with variable (apache#37090)
Browse files Browse the repository at this point in the history
1. current sql cache will miss cache in fe and process by the fashion path cause low qps, when the variable is changed:
```sql
select *
from tbl
where dt = @dt_var  -- maybe range between 10 days
```

this pr support try to hit sql cache even the variable is changed.

Before this optimize: one thread qps is 300.
After this optimize: one thread qps is 3000.

2. refactor and use `ComputeResultSet` interface to return resultSet if fe can process
  • Loading branch information
924060929 authored and dataroaring committed Jul 17, 2024
1 parent a230b56 commit d815b7a
Show file tree
Hide file tree
Showing 10 changed files with 277 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.doris.nereids.trees.plans.RelationId;
import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.Types.PUniqueId;
import org.apache.doris.qe.ConnectContext;
Expand All @@ -58,6 +59,7 @@
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;

import java.lang.reflect.Field;
Expand Down Expand Up @@ -124,16 +126,14 @@ public void tryAddFeSqlCache(ConnectContext connectContext, String sql) {
SqlCacheContext sqlCacheContext = sqlCacheContextOpt.get();
UserIdentity currentUserIdentity = connectContext.getCurrentUserIdentity();
String key = currentUserIdentity.toString() + ":" + sql.trim();
if ((sqlCaches.getIfPresent(key) == null) && sqlCacheContext.getOrComputeCacheKeyMd5() != null
if (sqlCaches.getIfPresent(key) == null && sqlCacheContext.getOrComputeCacheKeyMd5() != null
&& sqlCacheContext.getResultSetInFe().isPresent()) {
sqlCaches.put(key, sqlCacheContext);
}
}

/** tryAddCache */
public void tryAddCache(
ConnectContext connectContext, String sql,
CacheAnalyzer analyzer, boolean currentMissParseSqlFromSqlCache) {
/** tryAddBeCache */
public void tryAddBeCache(ConnectContext connectContext, String sql, CacheAnalyzer analyzer) {
Optional<SqlCacheContext> sqlCacheContextOpt = connectContext.getStatementContext().getSqlCacheContext();
if (!sqlCacheContextOpt.isPresent()) {
return;
Expand All @@ -144,8 +144,7 @@ public void tryAddCache(
SqlCacheContext sqlCacheContext = sqlCacheContextOpt.get();
UserIdentity currentUserIdentity = connectContext.getCurrentUserIdentity();
String key = currentUserIdentity.toString() + ":" + sql.trim();
if ((currentMissParseSqlFromSqlCache || sqlCaches.getIfPresent(key) == null)
&& sqlCacheContext.getOrComputeCacheKeyMd5() != null) {
if (sqlCaches.getIfPresent(key) == null && sqlCacheContext.getOrComputeCacheKeyMd5() != null) {
SqlCache cache = (SqlCache) analyzer.getCache();
sqlCacheContext.setSumOfPartitionNum(cache.getSumOfPartitionNum());
sqlCacheContext.setLatestPartitionId(cache.getLatestId());
Expand Down Expand Up @@ -183,9 +182,6 @@ public Optional<LogicalSqlCache> tryParseSql(ConnectContext connectContext, Stri
if (viewsChanged(env, sqlCacheContext)) {
return invalidateCache(key);
}
if (usedVariablesChanged(sqlCacheContext)) {
return invalidateCache(key);
}

LogicalEmptyRelation whateverPlan = new LogicalEmptyRelation(new RelationId(0), ImmutableList.of());
if (nondeterministicFunctionChanged(whateverPlan, connectContext, sqlCacheContext)) {
Expand All @@ -202,7 +198,10 @@ public Optional<LogicalSqlCache> tryParseSql(ConnectContext connectContext, Stri

try {
Optional<ResultSet> resultSetInFe = sqlCacheContext.getResultSetInFe();
if (resultSetInFe.isPresent()) {

List<Variable> currentVariables = resolveUserVariables(sqlCacheContext);
boolean usedVariablesChanged = usedVariablesChanged(currentVariables, sqlCacheContext);
if (resultSetInFe.isPresent() && !usedVariablesChanged) {
MetricRepo.COUNTER_CACHE_HIT_SQL.increase(1L);

String cachedPlan = sqlCacheContext.getPhysicalPlan();
Expand All @@ -215,7 +214,9 @@ public Optional<LogicalSqlCache> tryParseSql(ConnectContext connectContext, Stri
}

Status status = new Status();
PUniqueId cacheKeyMd5 = sqlCacheContext.getOrComputeCacheKeyMd5();
PUniqueId cacheKeyMd5 = usedVariablesChanged
? sqlCacheContext.doComputeCacheKeyMd5(Utils.fastToImmutableSet(currentVariables))
: sqlCacheContext.getOrComputeCacheKeyMd5();
InternalService.PFetchCacheResult cacheData =
SqlCache.getCacheData(sqlCacheContext.getCacheProxy(),
cacheKeyMd5, sqlCacheContext.getLatestPartitionId(),
Expand All @@ -236,7 +237,7 @@ public Optional<LogicalSqlCache> tryParseSql(ConnectContext connectContext, Stri
);
return Optional.of(logicalSqlCache);
}
return invalidateCache(key);
return Optional.empty();
} catch (Throwable t) {
return invalidateCache(key);
}
Expand Down Expand Up @@ -343,12 +344,24 @@ private boolean privilegeChanged(ConnectContext connectContext, Env env, SqlCach
return false;
}

private boolean usedVariablesChanged(SqlCacheContext sqlCacheContext) {
for (Variable variable : sqlCacheContext.getUsedVariables()) {
private List<Variable> resolveUserVariables(SqlCacheContext sqlCacheContext) {
List<Variable> cachedUsedVariables = sqlCacheContext.getUsedVariables();
List<Variable> currentVariables = Lists.newArrayListWithCapacity(cachedUsedVariables.size());
for (Variable cachedVariable : cachedUsedVariables) {
Variable currentVariable = ExpressionAnalyzer.resolveUnboundVariable(
new UnboundVariable(variable.getName(), variable.getType()));
if (!Objects.equals(currentVariable, variable)
|| variable.getRealExpression().anyMatch(Nondeterministic.class::isInstance)) {
new UnboundVariable(cachedVariable.getName(), cachedVariable.getType()));
currentVariables.add(currentVariable);
}
return currentVariables;
}

private boolean usedVariablesChanged(List<Variable> currentVariables, SqlCacheContext sqlCacheContext) {
List<Variable> cachedUsedVariables = sqlCacheContext.getUsedVariables();
for (int i = 0; i < cachedUsedVariables.size(); i++) {
Variable currentVariable = currentVariables.get(i);
Variable cachedVariable = cachedUsedVariables.get(i);
if (!Objects.equals(currentVariable, cachedVariable)
|| cachedVariable.getRealExpression().anyMatch(Nondeterministic.class::isInstance)) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@

import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.analysis.ExplainOptions;
import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.NereidsException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
Expand All @@ -47,19 +45,16 @@
import org.apache.doris.nereids.processor.pre.PlanPreprocessors;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.rules.exploration.mv.MaterializationContext;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
import org.apache.doris.nereids.trees.plans.ComputeResultSet;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
import org.apache.doris.nereids.trees.plans.distribute.DistributePlanner;
import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan;
import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink;
Expand All @@ -74,10 +69,8 @@
import org.apache.doris.qe.ResultSet;
import org.apache.doris.qe.ResultSetMetaData;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.cache.CacheAnalyzer;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -604,63 +597,17 @@ public Optional<ResultSet> handleQueryInFe(StatementBase parsedStmt) {
if (!(parsedStmt instanceof LogicalPlanAdapter)) {
return Optional.empty();
}
if (physicalPlan instanceof PhysicalSqlCache
&& ((PhysicalSqlCache) physicalPlan).getResultSet().isPresent()) {
return Optional.of(((PhysicalSqlCache) physicalPlan).getResultSet().get());
}
if (!(physicalPlan instanceof PhysicalResultSink)) {
return Optional.empty();
}

Optional<SqlCacheContext> sqlCacheContext = statementContext.getSqlCacheContext();
boolean enableSqlCache
= CacheAnalyzer.canUseSqlCache(statementContext.getConnectContext().getSessionVariable());
Plan child = physicalPlan.child(0);
if (child instanceof PhysicalOneRowRelation) {
PhysicalOneRowRelation physicalOneRowRelation = (PhysicalOneRowRelation) physicalPlan.child(0);
List<Column> columns = Lists.newArrayList();
List<String> data = Lists.newArrayList();
for (int i = 0; i < physicalOneRowRelation.getProjects().size(); i++) {
NamedExpression item = physicalOneRowRelation.getProjects().get(i);
NamedExpression output = physicalPlan.getOutput().get(i);
Expression expr = item.child(0);
if (expr instanceof Literal) {
LiteralExpr legacyExpr = ((Literal) expr).toLegacyLiteral();
columns.add(new Column(output.getName(), output.getDataType().toCatalogDataType()));
data.add(legacyExpr.getStringValueInFe());
} else {
return Optional.empty();
}
}

ResultSetMetaData metadata = new CommonResultSet.CommonResultSetMetaData(columns);
ResultSet resultSet = new CommonResultSet(metadata, Collections.singletonList(data));
if (sqlCacheContext.isPresent() && enableSqlCache) {
sqlCacheContext.get().setResultSetInFe(resultSet);
Env.getCurrentEnv().getSqlCacheManager().tryAddFeSqlCache(
statementContext.getConnectContext(),
statementContext.getOriginStatement().originStmt
);
}
return Optional.of(resultSet);
} else if (child instanceof PhysicalEmptyRelation) {
List<Column> columns = Lists.newArrayList();
for (int i = 0; i < physicalPlan.getOutput().size(); i++) {
NamedExpression output = physicalPlan.getOutput().get(i);
columns.add(new Column(output.getName(), output.getDataType().toCatalogDataType()));
if (physicalPlan instanceof ComputeResultSet) {
Optional<SqlCacheContext> sqlCacheContext = statementContext.getSqlCacheContext();
Optional<ResultSet> resultSet = ((ComputeResultSet) physicalPlan)
.computeResultInFe(cascadesContext, sqlCacheContext);
if (resultSet.isPresent()) {
return resultSet;
}
}

ResultSetMetaData metadata = new CommonResultSet.CommonResultSetMetaData(columns);
ResultSet resultSet = new CommonResultSet(metadata, ImmutableList.of());
if (sqlCacheContext.isPresent() && enableSqlCache) {
sqlCacheContext.get().setResultSetInFe(resultSet);
Env.getCurrentEnv().getSqlCacheManager().tryAddFeSqlCache(
statementContext.getConnectContext(),
statementContext.getOriginStatement().originStmt
);
}
return Optional.of(resultSet);
} else if (child instanceof PhysicalHashAggregate && getScanNodes().size() > 0
if (physicalPlan instanceof PhysicalResultSink
&& physicalPlan.child(0) instanceof PhysicalHashAggregate && !getScanNodes().isEmpty()
&& getScanNodes().get(0) instanceof IcebergScanNode) {
List<Column> columns = Lists.newArrayList();
NamedExpression output = physicalPlan.getOutput().get(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,53 +329,57 @@ public PUniqueId getOrComputeCacheKeyMd5() {
if (cacheKeyMd5 != null) {
return cacheKeyMd5;
}

StringBuilder cacheKey = new StringBuilder(originSql);
for (Entry<FullTableName, String> entry : usedViews.entrySet()) {
cacheKey.append("|")
.append(entry.getKey())
.append("=")
.append(entry.getValue());
}
for (Variable usedVariable : usedVariables) {
cacheKey.append("|")
.append(usedVariable.getType().name())
.append(":")
.append(usedVariable.getName())
.append("=")
.append(usedVariable.getRealExpression().toSql());
}
for (Pair<Expression, Expression> pair : foldNondeterministicPairs) {
cacheKey.append("|")
.append(pair.key().toSql())
.append("=")
.append(pair.value().toSql());
}
for (Entry<FullTableName, List<RowFilterPolicy>> entry : rowPolicies.entrySet()) {
List<RowFilterPolicy> policy = entry.getValue();
if (policy.isEmpty()) {
continue;
}
cacheKey.append("|")
.append(entry.getKey())
.append("=")
.append(policy);
}
for (Entry<FullColumnName, Optional<DataMaskPolicy>> entry : dataMaskPolicies.entrySet()) {
if (!entry.getValue().isPresent()) {
continue;
}
cacheKey.append("|")
.append(entry.getKey())
.append("=")
.append(entry.getValue().map(Object::toString).orElse(""));
}
cacheKeyMd5 = CacheProxy.getMd5(cacheKey.toString());
cacheKeyMd5 = doComputeCacheKeyMd5(usedVariables);
}
}
return cacheKeyMd5;
}

/** doComputeCacheKeyMd5 */
public synchronized PUniqueId doComputeCacheKeyMd5(Set<Variable> usedVariables) {
StringBuilder cacheKey = new StringBuilder(originSql);
for (Entry<FullTableName, String> entry : usedViews.entrySet()) {
cacheKey.append("|")
.append(entry.getKey())
.append("=")
.append(entry.getValue());
}
for (Variable usedVariable : usedVariables) {
cacheKey.append("|")
.append(usedVariable.getType().name())
.append(":")
.append(usedVariable.getName())
.append("=")
.append(usedVariable.getRealExpression().toSql());
}
for (Pair<Expression, Expression> pair : foldNondeterministicPairs) {
cacheKey.append("|")
.append(pair.key().toSql())
.append("=")
.append(pair.value().toSql());
}
for (Entry<FullTableName, List<RowFilterPolicy>> entry : rowPolicies.entrySet()) {
List<RowFilterPolicy> policy = entry.getValue();
if (policy.isEmpty()) {
continue;
}
cacheKey.append("|")
.append(entry.getKey())
.append("=")
.append(policy);
}
for (Entry<FullColumnName, Optional<DataMaskPolicy>> entry : dataMaskPolicies.entrySet()) {
if (!entry.getValue().isPresent()) {
continue;
}
cacheKey.append("|")
.append(entry.getKey())
.append("=")
.append(entry.getValue().map(Object::toString).orElse(""));
}
return CacheProxy.getMd5(cacheKey.toString());
}

public void setOriginSql(String originSql) {
this.originSql = originSql.trim();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.plans;

import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.SqlCacheContext;
import org.apache.doris.qe.ResultSet;

import java.util.Optional;

/**
* <p>
* This class is used to return result set in fe without send fragment to be.
* Some plans support this function, for example:
* <li>1. the sql `select 100` will generate a plan, PhysicalOneRowRelation, and PhysicalOneRowRelation implement this
* interface, so fe can send the only row to client immediately.
* </li>
* <li>2. the sql `select * from tbl limit 0` will generate PhysicalEmptyRelation, which means no any rows returned,
* the PhysicalEmptyRelation implement this interface.
* </li>
* </p>
* <p>
* If you want to cache the result set in fe, you can implement this interface and write this code:
* </p>
* <pre>
* StatementContext statementContext = cascadesContext.getStatementContext();
* boolean enableSqlCache
* = CacheAnalyzer.canUseSqlCache(statementContext.getConnectContext().getSessionVariable());
* if (sqlCacheContext.isPresent() && enableSqlCache) {
* sqlCacheContext.get().setResultSetInFe(resultSet);
* Env.getCurrentEnv().getSqlCacheManager().tryAddFeSqlCache(
* statementContext.getConnectContext(),
* statementContext.getOriginStatement().originStmt
* );
* }
* </pre>
*/
public interface ComputeResultSet {
Optional<ResultSet> computeResultInFe(CascadesContext cascadesContext, Optional<SqlCacheContext> sqlCacheContext);
}
Loading

0 comments on commit d815b7a

Please sign in to comment.