Skip to content

Commit

Permalink
Use DataSourceAnalysis throughout the query stack. (#9239)
Browse files Browse the repository at this point in the history
Builds on #9235, using the datasource analysis functionality to replace various ad-hoc
approaches. The most interesting changes are in ClientQuerySegmentWalker (brokers),
ServerManager (historicals), and SinkQuerySegmentWalker (indexing tasks).

Other changes related to improving how we analyze queries:

1) Changes TimelineServerView to return an Optional timeline, which I thought made
   the analysis changes cleaner to implement.
2) Added QueryToolChest#canPerformSubquery, which is now used by query entry points to
   determine whether it is safe to pass a subquery dataSource to the query toolchest.
   Fixes an issue introduced in #5471 where subqueries under non-groupBy-typed queries
   were silently ignored, since neither the query entry point nor the toolchest did
   anything special with them.
3) Removes the QueryPlus.withQuerySegmentSpec method, which was mostly being used in
   error-prone ways (ignoring any potential subqueries, and not verifying that the
   underlying data source is actually a table). Replaces with a new function,
   Queries.withSpecificSegments, that includes sanity checks.
  • Loading branch information
gianm authored Jan 23, 2020
1 parent 479c097 commit f0f6857
Show file tree
Hide file tree
Showing 38 changed files with 711 additions and 405 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;
import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
Expand Down Expand Up @@ -58,7 +57,6 @@
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.BySegmentQueryRunner;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.Druids;
Expand Down Expand Up @@ -89,6 +87,7 @@
import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
import org.apache.druid.query.groupby.strategy.GroupByStrategyV1;
import org.apache.druid.query.groupby.strategy.GroupByStrategyV2;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.query.timeseries.TimeseriesQuery;
Expand Down Expand Up @@ -126,14 +125,14 @@
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;

import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
Expand Down Expand Up @@ -217,8 +216,17 @@ public void setup()
.size(0)
.build();
final SegmentGenerator segmentGenerator = closer.register(new SegmentGenerator());
LOG.info("Starting benchmark setup using cacheDir[%s], rows[%,d].", segmentGenerator.getCacheDir(), rowsPerSegment);
final QueryableIndex index = segmentGenerator.generate(dataSegment, schemaInfo, Granularities.NONE, rowsPerSegment);
LOG.info(
"Starting benchmark setup using cacheDir[%s], rows[%,d].",
segmentGenerator.getCacheDir(),
rowsPerSegment
);
final QueryableIndex index = segmentGenerator.generate(
dataSegment,
schemaInfo,
Granularities.NONE,
rowsPerSegment
);
queryableIndexes.put(dataSegment, index);
}

Expand Down Expand Up @@ -518,12 +526,10 @@ void addSegmentToServer(DruidServer server, DataSegment segment)
.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(selector));
}

@Nullable
@Override
public TimelineLookup<String, ServerSelector> getTimeline(DataSource dataSource)
public Optional<? extends TimelineLookup<String, ServerSelector>> getTimeline(DataSourceAnalysis analysis)
{
final String table = Iterables.getOnlyElement(dataSource.getTableNames());
return timelines.get(table);
return Optional.ofNullable(timelines.get(analysis.getBaseTableDataSource().get().getName()));
}

@Override
Expand Down Expand Up @@ -563,7 +569,11 @@ private class SimpleQueryRunner implements QueryRunner<Object>
private final QueryRunnerFactoryConglomerate conglomerate;
private final QueryableIndexSegment segment;

public SimpleQueryRunner(QueryRunnerFactoryConglomerate conglomerate, SegmentId segmentId, QueryableIndex queryableIndex)
public SimpleQueryRunner(
QueryRunnerFactoryConglomerate conglomerate,
SegmentId segmentId,
QueryableIndex queryableIndex
)
{
this.conglomerate = conglomerate;
this.segment = new QueryableIndexSegment(queryableIndex, segmentId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import com.google.common.collect.ImmutableSortedSet;
import com.google.inject.Inject;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.Query;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.topn.TopNQuery;
Expand Down Expand Up @@ -54,24 +56,24 @@ public class DataSourceOptimizer
private ConcurrentHashMap<String, AtomicLong> hitCount = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, AtomicLong> costTime = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, ConcurrentHashMap<Set<String>, AtomicLong>> missFields = new ConcurrentHashMap<>();

@Inject
public DataSourceOptimizer(TimelineServerView serverView)
public DataSourceOptimizer(TimelineServerView serverView)
{
this.serverView = serverView;
}

/**
* Do main work about materialized view selection: transform user query to one or more sub-queries.
*
* In the sub-query, the dataSource is the derivative of dataSource in user query, and sum of all sub-queries'
*
* In the sub-query, the dataSource is the derivative of dataSource in user query, and sum of all sub-queries'
* intervals equals the interval in user query
*
*
* Derived dataSource with smallest average data size per segment granularity have highest priority to replace the
* datasource in user query
*
*
* @param query only TopNQuery/TimeseriesQuery/GroupByQuery can be optimized
* @return a list of queries with specified derived dataSources and intervals
* @return a list of queries with specified derived dataSources and intervals
*/
public List<Query> optimize(Query query)
{
Expand All @@ -86,7 +88,7 @@ public List<Query> optimize(Query query)
// get all derivatives for datasource in query. The derivatives set is sorted by average size of
// per segment granularity.
Set<DerivativeDataSource> derivatives = DerivativeDataSourceManager.getDerivatives(datasourceName);

if (derivatives.isEmpty()) {
return Collections.singletonList(query);
}
Expand All @@ -96,10 +98,10 @@ public List<Query> optimize(Query query)
hitCount.putIfAbsent(datasourceName, new AtomicLong(0));
costTime.putIfAbsent(datasourceName, new AtomicLong(0));
totalCount.get(datasourceName).incrementAndGet();

// get all fields which the query required
Set<String> requiredFields = MaterializedViewUtils.getRequiredFields(query);

Set<DerivativeDataSource> derivativesWithRequiredFields = new HashSet<>();
for (DerivativeDataSource derivativeDataSource : derivatives) {
derivativesHitCount.putIfAbsent(derivativeDataSource.getName(), new AtomicLong(0));
Expand All @@ -115,14 +117,15 @@ public List<Query> optimize(Query query)
costTime.get(datasourceName).addAndGet(System.currentTimeMillis() - start);
return Collections.singletonList(query);
}

List<Query> queries = new ArrayList<>();
List<Interval> remainingQueryIntervals = (List<Interval>) query.getIntervals();

for (DerivativeDataSource derivativeDataSource : ImmutableSortedSet.copyOf(derivativesWithRequiredFields)) {
final List<Interval> derivativeIntervals = remainingQueryIntervals.stream()
.flatMap(interval -> serverView
.getTimeline((new TableDataSource(derivativeDataSource.getName())))
.getTimeline(DataSourceAnalysis.forDataSource(new TableDataSource(derivativeDataSource.getName())))
.orElseThrow(() -> new ISE("No timeline for dataSource: %s", derivativeDataSource.getName()))
.lookup(interval)
.stream()
.map(TimelineObjectHolder::getInterval)
Expand All @@ -133,7 +136,7 @@ public List<Query> optimize(Query query)
if (derivativeIntervals.isEmpty()) {
continue;
}

remainingQueryIntervals = MaterializedViewUtils.minus(remainingQueryIntervals, derivativeIntervals);
queries.add(
query.withDataSource(new TableDataSource(derivativeDataSource.getName()))
Expand All @@ -158,13 +161,13 @@ public List<Query> optimize(Query query)
hitCount.get(datasourceName).incrementAndGet();
costTime.get(datasourceName).addAndGet(System.currentTimeMillis() - start);
return queries;
}
}
finally {
lock.readLock().unlock();
}
}

public List<DataSourceOptimizerStats> getAndResetStats()
public List<DataSourceOptimizerStats> getAndResetStats()
{
ImmutableMap<String, AtomicLong> derivativesHitCountSnapshot;
ImmutableMap<String, AtomicLong> totalCountSnapshot;
Expand All @@ -183,7 +186,7 @@ public List<DataSourceOptimizerStats> getAndResetStats()
hitCount.clear();
costTime.clear();
missFields.clear();
}
}
finally {
lock.writeLock().unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
Expand All @@ -62,6 +61,7 @@
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.movingaverage.test.TestConfig;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.server.ClientQuerySegmentWalker;
Expand All @@ -84,6 +84,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;

Expand Down Expand Up @@ -305,9 +306,9 @@ public void testQuery() throws IOException
new TimelineServerView()
{
@Override
public TimelineLookup<String, ServerSelector> getTimeline(DataSource dataSource)
public Optional<? extends TimelineLookup<String, ServerSelector>> getTimeline(DataSourceAnalysis analysis)
{
return null;
return Optional.empty();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
Expand Down Expand Up @@ -51,6 +50,7 @@
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.SetAndVerifyContextQueryRunner;
import org.apache.druid.server.initialization.ServerConfig;
Expand Down Expand Up @@ -328,11 +328,13 @@ public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<Seg
private <T> QueryRunner<T> getQueryRunnerImpl(Query<T> query)
{
QueryRunner<T> queryRunner = null;
final String queryDataSource = Iterables.getOnlyElement(query.getDataSource().getTableNames());

if (runningItem != null) {
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
final Task task = runningItem.getTask();
if (task.getDataSource().equals(queryDataSource)) {

if (analysis.getBaseTableDataSource().isPresent()
&& task.getDataSource().equals(analysis.getBaseTableDataSource().get().getName())) {
final QueryRunner<T> taskQueryRunner = task.getQueryRunner(query);

if (taskQueryRunner != null) {
Expand Down Expand Up @@ -379,7 +381,7 @@ public String getTaskType()
{
return task.getType();
}

@Override
public String getDataSource()
{
Expand Down
15 changes: 5 additions & 10 deletions processing/src/main/java/org/apache/druid/query/BaseQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.joda.time.DateTimeZone;
import org.joda.time.Duration;
Expand Down Expand Up @@ -117,17 +118,11 @@ public QueryRunner<T> getRunner(QuerySegmentWalker walker)
}

@VisibleForTesting
public static QuerySegmentSpec getQuerySegmentSpecForLookUp(BaseQuery query)
public static QuerySegmentSpec getQuerySegmentSpecForLookUp(BaseQuery<?> query)
{
if (query.getDataSource() instanceof QueryDataSource) {
QueryDataSource ds = (QueryDataSource) query.getDataSource();
Query subquery = ds.getQuery();
if (subquery instanceof BaseQuery) {
return getQuerySegmentSpecForLookUp((BaseQuery) subquery);
}
throw new IllegalStateException("Invalid subquery type " + subquery.getClass());
}
return query.getQuerySegmentSpec();
return DataSourceAnalysis.forDataSource(query.getDataSource())
.getBaseQuerySegmentSpec()
.orElse(query.getQuerySegmentSpec());
}

@Override
Expand Down
48 changes: 45 additions & 3 deletions processing/src/main/java/org/apache/druid/query/Queries.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;

import java.util.Collections;
import java.util.HashMap;
Expand All @@ -33,9 +36,6 @@
import java.util.Map;
import java.util.Set;

/**
*
*/
@PublicApi
public class Queries
{
Expand Down Expand Up @@ -131,4 +131,46 @@ public static List<PostAggregator> prepareAggregations(

return postAggs;
}

/**
* Rewrite "query" to refer to some specific segment descriptors.
*
* The dataSource for "query" must be based on a single table for this operation to be valid. Otherwise, this
* function will throw an exception.
*
* Unlike the seemingly-similar {@code query.withQuerySegmentSpec(new MultipleSpecificSegmentSpec(descriptors))},
* this this method will walk down subqueries found within the query datasource, if any, and modify the lowest-level
* subquery. The effect is that
* {@code DataSourceAnalysis.forDataSource(query.getDataSource()).getBaseQuerySegmentSpec()} is guaranteed to return
* either {@code new MultipleSpecificSegmentSpec(descriptors)} or empty.
*
* Because {@link BaseQuery#getRunner} is implemented using {@link DataSourceAnalysis#getBaseQuerySegmentSpec}, this
* method will cause the runner to be a specific-segments runner.
*/
public static <T> Query<T> withSpecificSegments(final Query<T> query, final List<SegmentDescriptor> descriptors)
{
final Query<T> retVal;

if (query.getDataSource() instanceof QueryDataSource) {
final Query<?> subQuery = ((QueryDataSource) query.getDataSource()).getQuery();
retVal = query.withDataSource(new QueryDataSource(withSpecificSegments(subQuery, descriptors)));
} else {
retVal = query.withQuerySegmentSpec(new MultipleSpecificSegmentSpec(descriptors));
}

// Verify preconditions and invariants, just in case.
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(retVal.getDataSource());

if (!analysis.getBaseTableDataSource().isPresent()) {
throw new ISE("Unable to apply specific segments to non-table-based dataSource[%s]", query.getDataSource());
}

if (analysis.getBaseQuerySegmentSpec().isPresent()
&& !analysis.getBaseQuerySegmentSpec().get().equals(new MultipleSpecificSegmentSpec(descriptors))) {
// If you see the error message below, it's a bug in either this function or in DataSourceAnalysis.
throw new ISE("Unable to apply specific segments to query with dataSource[%s]", query.getDataSource());
}

return retVal;
}
}
Loading

0 comments on commit f0f6857

Please sign in to comment.