Skip to content

Commit

Permalink
Add caching support to join queries (#10366)
Browse files Browse the repository at this point in the history
* Proposed changes for making joins cacheable

* Add unit tests

* Fix tests

* simplify logic

* Pull empty byte array logic out of CachingQueryRunner

* remove useless null check

* Minor refactor

* Fix tests

* Fix segment caching on Broker

* Move join cache key computation in Broker

Move join cache key computation in Broker from ResultLevelCachingQueryRunner to CachingClusteredClient

* Fix compilation

* Review comments

* Add more tests

* Fix inspection errors

* Pushed condition analysis to JoinableFactory

* review comments

* Disable join caching for broker and add prefix key to BroadcastSegmentIndexedTable

* Remove commented lines

* Fix populateCache

* Disable caching for selective datasources

Refactored the code so that we can decide at the data source level, whether to enable cache for broker or data nodes
  • Loading branch information
abhishekagarwal87 authored Oct 10, 2020
1 parent 4c78b51 commit 4d2a92f
Show file tree
Hide file tree
Showing 58 changed files with 1,684 additions and 495 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import org.apache.druid.client.CachingClusteredClient;
Expand Down Expand Up @@ -103,6 +104,7 @@
import org.apache.druid.segment.generator.GeneratorBasicSchemas;
import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.generator.SegmentGenerator;
import org.apache.druid.segment.join.MapJoinableFactory;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.timeline.DataSegment;
Expand Down Expand Up @@ -339,7 +341,8 @@ public <T, QueryType extends Query<T>> QueryToolChest<T, QueryType> getToolChest
new DruidHttpClientConfig(),
processingConfig,
forkJoinPool,
QueryStackTests.DEFAULT_NOOP_SCHEDULER
QueryStackTests.DEFAULT_NOOP_SCHEDULER,
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of())
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,8 @@ public String getFormatString()
}
},
ForkJoinPool.commonPool(),
QueryStackTests.DEFAULT_NOOP_SCHEDULER
QueryStackTests.DEFAULT_NOOP_SCHEDULER,
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of())
);

ClientQuerySegmentWalker walker = new ClientQuerySegmentWalker(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.druid.timeline.VersionedIntervalTimeline;

import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -104,7 +105,8 @@ <T> QueryRunner<T> buildQueryRunnerForSegment(
QueryToolChest<T, Query<T>> toolChest,
VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline,
Function<SegmentReference, SegmentReference> segmentMapFn,
AtomicLong cpuTimeAccumulator
AtomicLong cpuTimeAccumulator,
Optional<byte[]> cacheKeyPrefix
)
{
if (query.getContextBoolean(QUERY_RETRY_TEST_CONTEXT_KEY, false)) {
Expand Down Expand Up @@ -135,7 +137,8 @@ <T> QueryRunner<T> buildQueryRunnerForSegment(
toolChest,
timeline,
segmentMapFn,
cpuTimeAccumulator
cpuTimeAccumulator,
cacheKeyPrefix
);
}
}
10 changes: 3 additions & 7 deletions processing/src/main/java/org/apache/druid/query/DataSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,10 @@ public interface DataSource

/**
* Returns true if queries on this dataSource are cacheable at both the result level and per-segment level.
* Currently, dataSources that modify the behavior of per-segment processing are not cacheable (like 'join').
* Nor are dataSources that do not actually reference segments (like 'inline'), since cache keys are always based
* on segment identifiers.
*
* Note: Ideally, queries on 'join' datasources _would_ be cacheable, but we cannot currently do this due to lacking
* the code necessary to compute cache keys properly.
* Currently, dataSources that do not actually reference segments (like 'inline'), are not cacheable since cache keys
* are always based on segment identifiers.
*/
boolean isCacheable();
boolean isCacheable(boolean isBroker);

/**
* Returns true if all servers have a full copy of this datasource. True for things like inline, lookup, etc, or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@ public boolean isGlobal()
return true;
}

/**
* Query results from Broadcast datasources should not be cached on broker
* https://github.com/apache/druid/issues/10444
*/
@Override
public boolean isCacheable(boolean isBroker)
{
return !isBroker;
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public DataSource withChildren(List<DataSource> children)
}

@Override
public boolean isCacheable()
public boolean isCacheable(boolean isBroker)
{
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.segment.join.JoinConditionAnalysis;
import org.apache.druid.segment.join.JoinPrefixUtils;
import org.apache.druid.segment.join.JoinType;
import org.apache.druid.segment.join.Joinables;

import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -69,7 +69,7 @@ private JoinDataSource(
{
this.left = Preconditions.checkNotNull(left, "left");
this.right = Preconditions.checkNotNull(right, "right");
this.rightPrefix = Joinables.validatePrefix(rightPrefix);
this.rightPrefix = JoinPrefixUtils.validatePrefix(rightPrefix);
this.conditionAnalysis = Preconditions.checkNotNull(conditionAnalysis, "conditionAnalysis");
this.joinType = Preconditions.checkNotNull(joinType, "joinType");
}
Expand Down Expand Up @@ -175,9 +175,9 @@ public DataSource withChildren(List<DataSource> children)
}

@Override
public boolean isCacheable()
public boolean isCacheable(boolean isBroker)
{
return false;
return left.isCacheable(isBroker) && right.isCacheable(isBroker);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public DataSource withChildren(List<DataSource> children)
}

@Override
public boolean isCacheable()
public boolean isCacheable(boolean isBroker)
{
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public DataSource withChildren(List<DataSource> children)
}

@Override
public boolean isCacheable()
public boolean isCacheable(boolean isBroker)
{
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public DataSource withChildren(List<DataSource> children)
}

@Override
public boolean isCacheable()
public boolean isCacheable(boolean isBroker)
{
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public DataSource withChildren(List<DataSource> children)
}

@Override
public boolean isCacheable()
public boolean isCacheable(boolean isBroker)
{
// Disables result-level caching for 'union' datasources, which doesn't work currently.
// See https://github.com/apache/druid/issues/8713 for reference.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import com.google.common.primitives.UnsignedBytes;
import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.java.util.common.Cacheable;
Expand Down Expand Up @@ -65,6 +66,7 @@ public class CacheKeyBuilder
static final byte CACHEABLE_KEY = 9;
static final byte CACHEABLE_LIST_KEY = 10;
static final byte DOUBLE_ARRAY_KEY = 11;
static final byte LONG_KEY = 12;

static final byte[] STRING_SEPARATOR = new byte[]{(byte) 0xFF};
static final byte[] EMPTY_BYTES = StringUtils.EMPTY_BYTES;
Expand Down Expand Up @@ -250,6 +252,12 @@ public CacheKeyBuilder appendInt(int input)
return this;
}

public CacheKeyBuilder appendLong(long input)
{
appendItem(LONG_KEY, Longs.toByteArray(input));
return this;
}

public CacheKeyBuilder appendFloat(float input)
{
appendItem(FLOAT_KEY, ByteBuffer.allocate(Float.BYTES).putFloat(input).array());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,14 @@ public boolean isQuery()
return dataSource instanceof QueryDataSource;
}

/**
* Returns true if this datasource is made out of a join operation
*/
public boolean isJoin()
{
return !preJoinableClauses.isEmpty();
}

@Override
public boolean equals(Object o)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import com.google.common.base.Preconditions;
import org.apache.druid.query.DataSource;
import org.apache.druid.segment.join.JoinConditionAnalysis;
import org.apache.druid.segment.join.JoinPrefixUtils;
import org.apache.druid.segment.join.JoinType;
import org.apache.druid.segment.join.Joinables;

import java.util.Objects;

Expand All @@ -46,7 +46,7 @@ public PreJoinableClause(
final JoinConditionAnalysis condition
)
{
this.prefix = Joinables.validatePrefix(prefix);
this.prefix = JoinPrefixUtils.validatePrefix(prefix);
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.joinType = Preconditions.checkNotNull(joinType, "joinType");
this.condition = Preconditions.checkNotNull(condition, "condition");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,9 @@ public static JoinConditionAnalysis forExpression(

private static boolean isLeftExprAndRightColumn(final Expr a, final Expr b, final String rightPrefix)
{
return a.analyzeInputs().getRequiredBindings().stream().noneMatch(c -> Joinables.isPrefixedBy(c, rightPrefix))
return a.analyzeInputs().getRequiredBindings().stream().noneMatch(c -> JoinPrefixUtils.isPrefixedBy(c, rightPrefix))
&& b.getBindingIfIdentifier() != null
&& Joinables.isPrefixedBy(b.getBindingIfIdentifier(), rightPrefix);
&& JoinPrefixUtils.isPrefixedBy(b.getBindingIfIdentifier(), rightPrefix);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.segment.join;

import org.apache.druid.java.util.common.IAE;
import org.apache.druid.segment.column.ColumnHolder;

import javax.annotation.Nullable;
import java.util.Comparator;
import java.util.List;

/**
* Utility class for working with prefixes in join operations
*/
public class JoinPrefixUtils
{
private static final Comparator<String> DESCENDING_LENGTH_STRING_COMPARATOR = (s1, s2) ->
Integer.compare(s2.length(), s1.length());

/**
* Checks that "prefix" is a valid prefix for a join clause (see {@link JoinableClause#getPrefix()}) and, if so,
* returns it. Otherwise, throws an exception.
*/
public static String validatePrefix(@Nullable final String prefix)
{
if (prefix == null || prefix.isEmpty()) {
throw new IAE("Join clause cannot have null or empty prefix");
} else if (isPrefixedBy(ColumnHolder.TIME_COLUMN_NAME, prefix) || ColumnHolder.TIME_COLUMN_NAME.equals(prefix)) {
throw new IAE(
"Join clause cannot have prefix[%s], since it would shadow %s",
prefix,
ColumnHolder.TIME_COLUMN_NAME
);
} else {
return prefix;
}
}

public static boolean isPrefixedBy(final String columnName, final String prefix)
{
return columnName.length() > prefix.length() && columnName.startsWith(prefix);
}

/**
* Check if any prefixes in the provided list duplicate or shadow each other.
*
* @param prefixes A mutable list containing the prefixes to check. This list will be sorted by descending
* string length.
*/
public static void checkPrefixesForDuplicatesAndShadowing(
final List<String> prefixes
)
{
// this is a naive approach that assumes we'll typically handle only a small number of prefixes
prefixes.sort(DESCENDING_LENGTH_STRING_COMPARATOR);
for (int i = 0; i < prefixes.size(); i++) {
String prefix = prefixes.get(i);
for (int k = i + 1; k < prefixes.size(); k++) {
String otherPrefix = prefixes.get(k);
if (prefix.equals(otherPrefix)) {
throw new IAE("Detected duplicate prefix in join clauses: [%s]", prefix);
}
if (isPrefixedBy(prefix, otherPrefix)) {
throw new IAE("Detected conflicting prefixes in join clauses: [%s, %s]", prefix, otherPrefix);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
* Represents everything about a join clause except for the left-hand datasource. In other words, if the full join
* clause is "t1 JOIN t2 ON t1.x = t2.x" then this class represents "JOIN t2 ON x = t2.x" -- it does not include
* references to the left-hand "t1".
*
* Created from {@link org.apache.druid.query.planning.PreJoinableClause} by {@link Joinables#createSegmentMapFn}.
* <p>
* Created from {@link org.apache.druid.query.planning.PreJoinableClause} by {@link JoinableFactoryWrapper#createSegmentMapFn}.
*/
public class JoinableClause implements ReferenceCountedObject
{
Expand All @@ -45,7 +45,7 @@ public class JoinableClause implements ReferenceCountedObject

public JoinableClause(String prefix, Joinable joinable, JoinType joinType, JoinConditionAnalysis condition)
{
this.prefix = Joinables.validatePrefix(prefix);
this.prefix = JoinPrefixUtils.validatePrefix(prefix);
this.joinable = Preconditions.checkNotNull(joinable, "joinable");
this.joinType = Preconditions.checkNotNull(joinType, "joinType");
this.condition = Preconditions.checkNotNull(condition, "condition");
Expand Down Expand Up @@ -106,7 +106,7 @@ public List<String> getAvailableColumnsPrefixed()
*/
public boolean includesColumn(final String columnName)
{
return Joinables.isPrefixedBy(columnName, prefix);
return JoinPrefixUtils.isPrefixedBy(columnName, prefix);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,19 @@ public interface JoinableFactory
*
* @param dataSource the datasource to join on
* @param condition the condition to join on
*
* @return a Joinable if this datasource + condition combo is joinable; empty if not
*/
Optional<Joinable> build(DataSource dataSource, JoinConditionAnalysis condition);

/**
* Compute the cache key for a data source participating in join operation. This is done separately from {{@link #build(DataSource, JoinConditionAnalysis)}}
* which can be an expensive operation and can potentially be avoided if cached results can be used.
*
* @param dataSource the datasource to join on
* @param condition the condition to join on
*/
default Optional<byte[]> computeJoinCacheKey(DataSource dataSource, JoinConditionAnalysis condition)
{
return Optional.empty();
}
}
Loading

0 comments on commit 4d2a92f

Please sign in to comment.