diff --git a/benchmarks/src/main/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java b/benchmarks/src/main/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java index d7b8b63bedfe..72fefa93946a 100644 --- a/benchmarks/src/main/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java +++ b/benchmarks/src/main/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java @@ -25,7 +25,6 @@ import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; import com.google.common.collect.Ordering; import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo; import org.apache.druid.benchmark.datagen.BenchmarkSchemas; @@ -58,7 +57,6 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.BySegmentQueryRunner; -import org.apache.druid.query.DataSource; import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate; import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.Druids; @@ -89,6 +87,7 @@ import org.apache.druid.query.groupby.strategy.GroupByStrategySelector; import org.apache.druid.query.groupby.strategy.GroupByStrategyV1; import org.apache.druid.query.groupby.strategy.GroupByStrategyV2; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.query.spec.QuerySegmentSpec; import org.apache.druid.query.timeseries.TimeseriesQuery; @@ -126,7 +125,6 @@ import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.infra.Blackhole; -import javax.annotation.Nullable; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collections; @@ -134,6 +132,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.ForkJoinPool; @@ -217,8 +216,17 @@ public void setup() .size(0) .build(); final SegmentGenerator segmentGenerator = closer.register(new SegmentGenerator()); - LOG.info("Starting benchmark setup using cacheDir[%s], rows[%,d].", segmentGenerator.getCacheDir(), rowsPerSegment); - final QueryableIndex index = segmentGenerator.generate(dataSegment, schemaInfo, Granularities.NONE, rowsPerSegment); + LOG.info( + "Starting benchmark setup using cacheDir[%s], rows[%,d].", + segmentGenerator.getCacheDir(), + rowsPerSegment + ); + final QueryableIndex index = segmentGenerator.generate( + dataSegment, + schemaInfo, + Granularities.NONE, + rowsPerSegment + ); queryableIndexes.put(dataSegment, index); } @@ -518,12 +526,10 @@ void addSegmentToServer(DruidServer server, DataSegment segment) .add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(selector)); } - @Nullable @Override - public TimelineLookup getTimeline(DataSource dataSource) + public Optional> getTimeline(DataSourceAnalysis analysis) { - final String table = Iterables.getOnlyElement(dataSource.getTableNames()); - return timelines.get(table); + return Optional.ofNullable(timelines.get(analysis.getBaseTableDataSource().get().getName())); } @Override @@ -563,7 +569,11 @@ private class SimpleQueryRunner implements QueryRunner private final QueryRunnerFactoryConglomerate conglomerate; private final QueryableIndexSegment segment; - public SimpleQueryRunner(QueryRunnerFactoryConglomerate conglomerate, SegmentId segmentId, QueryableIndex queryableIndex) + public SimpleQueryRunner( + QueryRunnerFactoryConglomerate conglomerate, + SegmentId segmentId, + QueryableIndex queryableIndex + ) { this.conglomerate = conglomerate; this.segment = new QueryableIndexSegment(queryableIndex, segmentId); diff --git a/extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/DataSourceOptimizer.java b/extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/DataSourceOptimizer.java index bbd6aa8a4f2b..1653539f3e36 100644 --- a/extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/DataSourceOptimizer.java +++ b/extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/DataSourceOptimizer.java @@ -23,9 +23,11 @@ import com.google.common.collect.ImmutableSortedSet; import com.google.inject.Inject; import org.apache.druid.client.TimelineServerView; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.query.Query; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.groupby.GroupByQuery; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.query.timeseries.TimeseriesQuery; import org.apache.druid.query.topn.TopNQuery; @@ -54,24 +56,24 @@ public class DataSourceOptimizer private ConcurrentHashMap hitCount = new ConcurrentHashMap<>(); private ConcurrentHashMap costTime = new ConcurrentHashMap<>(); private ConcurrentHashMap, AtomicLong>> missFields = new ConcurrentHashMap<>(); - + @Inject - public DataSourceOptimizer(TimelineServerView serverView) + public DataSourceOptimizer(TimelineServerView serverView) { this.serverView = serverView; } /** * Do main work about materialized view selection: transform user query to one or more sub-queries. - * - * In the sub-query, the dataSource is the derivative of dataSource in user query, and sum of all sub-queries' + * + * In the sub-query, the dataSource is the derivative of dataSource in user query, and sum of all sub-queries' * intervals equals the interval in user query - * + * * Derived dataSource with smallest average data size per segment granularity have highest priority to replace the * datasource in user query - * + * * @param query only TopNQuery/TimeseriesQuery/GroupByQuery can be optimized - * @return a list of queries with specified derived dataSources and intervals + * @return a list of queries with specified derived dataSources and intervals */ public List optimize(Query query) { @@ -86,7 +88,7 @@ public List optimize(Query query) // get all derivatives for datasource in query. The derivatives set is sorted by average size of // per segment granularity. Set derivatives = DerivativeDataSourceManager.getDerivatives(datasourceName); - + if (derivatives.isEmpty()) { return Collections.singletonList(query); } @@ -96,10 +98,10 @@ public List optimize(Query query) hitCount.putIfAbsent(datasourceName, new AtomicLong(0)); costTime.putIfAbsent(datasourceName, new AtomicLong(0)); totalCount.get(datasourceName).incrementAndGet(); - + // get all fields which the query required Set requiredFields = MaterializedViewUtils.getRequiredFields(query); - + Set derivativesWithRequiredFields = new HashSet<>(); for (DerivativeDataSource derivativeDataSource : derivatives) { derivativesHitCount.putIfAbsent(derivativeDataSource.getName(), new AtomicLong(0)); @@ -115,14 +117,15 @@ public List optimize(Query query) costTime.get(datasourceName).addAndGet(System.currentTimeMillis() - start); return Collections.singletonList(query); } - + List queries = new ArrayList<>(); List remainingQueryIntervals = (List) query.getIntervals(); - + for (DerivativeDataSource derivativeDataSource : ImmutableSortedSet.copyOf(derivativesWithRequiredFields)) { final List derivativeIntervals = remainingQueryIntervals.stream() .flatMap(interval -> serverView - .getTimeline((new TableDataSource(derivativeDataSource.getName()))) + .getTimeline(DataSourceAnalysis.forDataSource(new TableDataSource(derivativeDataSource.getName()))) + .orElseThrow(() -> new ISE("No timeline for dataSource: %s", derivativeDataSource.getName())) .lookup(interval) .stream() .map(TimelineObjectHolder::getInterval) @@ -133,7 +136,7 @@ public List optimize(Query query) if (derivativeIntervals.isEmpty()) { continue; } - + remainingQueryIntervals = MaterializedViewUtils.minus(remainingQueryIntervals, derivativeIntervals); queries.add( query.withDataSource(new TableDataSource(derivativeDataSource.getName())) @@ -158,13 +161,13 @@ public List optimize(Query query) hitCount.get(datasourceName).incrementAndGet(); costTime.get(datasourceName).addAndGet(System.currentTimeMillis() - start); return queries; - } + } finally { lock.readLock().unlock(); } } - public List getAndResetStats() + public List getAndResetStats() { ImmutableMap derivativesHitCountSnapshot; ImmutableMap totalCountSnapshot; @@ -183,7 +186,7 @@ public List getAndResetStats() hitCount.clear(); costTime.clear(); missFields.clear(); - } + } finally { lock.writeLock().unlock(); } diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java index 83881c79cf97..9090bfe168d8 100644 --- a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java +++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java @@ -49,7 +49,6 @@ import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.emitter.core.Event; import org.apache.druid.java.util.emitter.service.ServiceEmitter; -import org.apache.druid.query.DataSource; import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.Query; import org.apache.druid.query.QueryPlus; @@ -62,6 +61,7 @@ import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.query.movingaverage.test.TestConfig; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.timeseries.TimeseriesQuery; import org.apache.druid.query.timeseries.TimeseriesResultValue; import org.apache.druid.server.ClientQuerySegmentWalker; @@ -84,6 +84,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.Executor; import java.util.concurrent.ForkJoinPool; @@ -305,9 +306,9 @@ public void testQuery() throws IOException new TimelineServerView() { @Override - public TimelineLookup getTimeline(DataSource dataSource) + public Optional> getTimeline(DataSourceAnalysis analysis) { - return null; + return Optional.empty(); } @Override diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java index ecb5d9ab03da..7e7fafac3e8b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java @@ -22,7 +22,6 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -51,6 +50,7 @@ import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QuerySegmentWalker; import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.server.DruidNode; import org.apache.druid.server.SetAndVerifyContextQueryRunner; import org.apache.druid.server.initialization.ServerConfig; @@ -328,11 +328,13 @@ public QueryRunner getQueryRunnerForSegments(Query query, Iterable QueryRunner getQueryRunnerImpl(Query query) { QueryRunner queryRunner = null; - final String queryDataSource = Iterables.getOnlyElement(query.getDataSource().getTableNames()); if (runningItem != null) { + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource()); final Task task = runningItem.getTask(); - if (task.getDataSource().equals(queryDataSource)) { + + if (analysis.getBaseTableDataSource().isPresent() + && task.getDataSource().equals(analysis.getBaseTableDataSource().get().getName())) { final QueryRunner taskQueryRunner = task.getQueryRunner(query); if (taskQueryRunner != null) { @@ -379,7 +381,7 @@ public String getTaskType() { return task.getType(); } - + @Override public String getDataSource() { diff --git a/processing/src/main/java/org/apache/druid/query/BaseQuery.java b/processing/src/main/java/org/apache/druid/query/BaseQuery.java index cc2cd6df6b4c..70fbdf992707 100644 --- a/processing/src/main/java/org/apache/druid/query/BaseQuery.java +++ b/processing/src/main/java/org/apache/druid/query/BaseQuery.java @@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.PeriodGranularity; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.spec.QuerySegmentSpec; import org.joda.time.DateTimeZone; import org.joda.time.Duration; @@ -117,17 +118,11 @@ public QueryRunner getRunner(QuerySegmentWalker walker) } @VisibleForTesting - public static QuerySegmentSpec getQuerySegmentSpecForLookUp(BaseQuery query) + public static QuerySegmentSpec getQuerySegmentSpecForLookUp(BaseQuery query) { - if (query.getDataSource() instanceof QueryDataSource) { - QueryDataSource ds = (QueryDataSource) query.getDataSource(); - Query subquery = ds.getQuery(); - if (subquery instanceof BaseQuery) { - return getQuerySegmentSpecForLookUp((BaseQuery) subquery); - } - throw new IllegalStateException("Invalid subquery type " + subquery.getClass()); - } - return query.getQuerySegmentSpec(); + return DataSourceAnalysis.forDataSource(query.getDataSource()) + .getBaseQuerySegmentSpec() + .orElse(query.getQuerySegmentSpec()); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/Queries.java b/processing/src/main/java/org/apache/druid/query/Queries.java index 37408a4aea3f..a2ff0051bf94 100644 --- a/processing/src/main/java/org/apache/druid/query/Queries.java +++ b/processing/src/main/java/org/apache/druid/query/Queries.java @@ -23,8 +23,11 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.apache.druid.guice.annotations.PublicApi; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.PostAggregator; +import org.apache.druid.query.planning.DataSourceAnalysis; +import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; import java.util.Collections; import java.util.HashMap; @@ -33,9 +36,6 @@ import java.util.Map; import java.util.Set; -/** - * - */ @PublicApi public class Queries { @@ -131,4 +131,46 @@ public static List prepareAggregations( return postAggs; } + + /** + * Rewrite "query" to refer to some specific segment descriptors. + * + * The dataSource for "query" must be based on a single table for this operation to be valid. Otherwise, this + * function will throw an exception. + * + * Unlike the seemingly-similar {@code query.withQuerySegmentSpec(new MultipleSpecificSegmentSpec(descriptors))}, + * this this method will walk down subqueries found within the query datasource, if any, and modify the lowest-level + * subquery. The effect is that + * {@code DataSourceAnalysis.forDataSource(query.getDataSource()).getBaseQuerySegmentSpec()} is guaranteed to return + * either {@code new MultipleSpecificSegmentSpec(descriptors)} or empty. + * + * Because {@link BaseQuery#getRunner} is implemented using {@link DataSourceAnalysis#getBaseQuerySegmentSpec}, this + * method will cause the runner to be a specific-segments runner. + */ + public static Query withSpecificSegments(final Query query, final List descriptors) + { + final Query retVal; + + if (query.getDataSource() instanceof QueryDataSource) { + final Query subQuery = ((QueryDataSource) query.getDataSource()).getQuery(); + retVal = query.withDataSource(new QueryDataSource(withSpecificSegments(subQuery, descriptors))); + } else { + retVal = query.withQuerySegmentSpec(new MultipleSpecificSegmentSpec(descriptors)); + } + + // Verify preconditions and invariants, just in case. + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(retVal.getDataSource()); + + if (!analysis.getBaseTableDataSource().isPresent()) { + throw new ISE("Unable to apply specific segments to non-table-based dataSource[%s]", query.getDataSource()); + } + + if (analysis.getBaseQuerySegmentSpec().isPresent() + && !analysis.getBaseQuerySegmentSpec().get().equals(new MultipleSpecificSegmentSpec(descriptors))) { + // If you see the error message below, it's a bug in either this function or in DataSourceAnalysis. + throw new ISE("Unable to apply specific segments to query with dataSource[%s]", query.getDataSource()); + } + + return retVal; + } } diff --git a/processing/src/main/java/org/apache/druid/query/Query.java b/processing/src/main/java/org/apache/druid/query/Query.java index 387509839bb5..37868b4a25d4 100644 --- a/processing/src/main/java/org/apache/druid/query/Query.java +++ b/processing/src/main/java/org/apache/druid/query/Query.java @@ -116,6 +116,12 @@ public interface Query Query withOverriddenContext(Map contextOverride); + /** + * Returns a new query, identical to this one, but with a different associated {@link QuerySegmentSpec}. + * + * This often changes the behavior of {@link #getRunner(QuerySegmentWalker)}, since most queries inherit that method + * from {@link BaseQuery}, which implements it by calling {@link QuerySegmentSpec#lookup}. + */ Query withQuerySegmentSpec(QuerySegmentSpec spec); Query withId(String id); @@ -140,14 +146,4 @@ default Query optimizeForSegment(PerSegmentQueryOptimizationContext optimizat { return this; } - - default List getIntervalsOfInnerMostQuery() - { - if (getDataSource() instanceof QueryDataSource) { - //noinspection unchecked - return ((QueryDataSource) getDataSource()).getQuery().getIntervalsOfInnerMostQuery(); - } else { - return getIntervals(); - } - } } diff --git a/processing/src/main/java/org/apache/druid/query/QueryPlus.java b/processing/src/main/java/org/apache/druid/query/QueryPlus.java index f1884d356242..1b18e9439099 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryPlus.java +++ b/processing/src/main/java/org/apache/druid/query/QueryPlus.java @@ -24,7 +24,6 @@ import org.apache.druid.guice.annotations.PublicApi; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.query.context.ResponseContext; -import org.apache.druid.query.spec.QuerySegmentSpec; import javax.annotation.Nullable; @@ -125,14 +124,6 @@ private QueryPlus withoutQueryMetrics() } } - /** - * Equivalent of withQuery(getQuery().withQuerySegmentSpec(spec)). - */ - public QueryPlus withQuerySegmentSpec(QuerySegmentSpec spec) - { - return new QueryPlus<>(query.withQuerySegmentSpec(spec), queryMetrics, identity); - } - /** * Equivalent of withQuery(getQuery().withOverriddenContext(ImmutableMap.of(MAX_QUEUED_BYTES_KEY, maxQueuedBytes))). */ diff --git a/processing/src/main/java/org/apache/druid/query/QuerySegmentWalker.java b/processing/src/main/java/org/apache/druid/query/QuerySegmentWalker.java index 970fb6e28c08..7084a80935d1 100644 --- a/processing/src/main/java/org/apache/druid/query/QuerySegmentWalker.java +++ b/processing/src/main/java/org/apache/druid/query/QuerySegmentWalker.java @@ -22,6 +22,7 @@ import org.joda.time.Interval; /** + * An interface for query-handling entry points. */ public interface QuerySegmentWalker { @@ -29,19 +30,27 @@ public interface QuerySegmentWalker * Gets the Queryable for a given interval, the Queryable returned can be any version(s) or partitionNumber(s) * such that it represents the interval. * - * @param query result type - * @param query the query to find a Queryable for + * @param query result type + * @param query the query to find a Queryable for * @param intervals the intervals to find a Queryable for + * * @return a Queryable object that represents the interval */ QueryRunner getQueryRunnerForIntervals(Query query, Iterable intervals); /** - * Gets the Queryable for a given list of SegmentSpecs. + * Gets the Queryable for a given list of SegmentDescriptors. + * + * The descriptors are expected to apply to the base datasource involved in the query, i.e. the one returned by: * - * @param the query result type + *
+   *   DataSourceAnalysis.forDataSource(query.getDataSource()).getBaseDataSource()
+   * 
+ * + * @param the query result type * @param query the query to return a Queryable for * @param specs the list of SegmentSpecs to find a Queryable for + * * @return the Queryable object with the given SegmentSpecs */ QueryRunner getQueryRunnerForSegments(Query query, Iterable specs); diff --git a/processing/src/main/java/org/apache/druid/query/QueryToolChest.java b/processing/src/main/java/org/apache/druid/query/QueryToolChest.java index c271af8e4d5c..b72d9d76eb4e 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/QueryToolChest.java @@ -271,6 +271,20 @@ public List filterSegments(QueryType query, List subquery) + { + return false; + } + /** * Returns a list of field names in the order that {@link #resultsAsArrays} would return them. The returned list will * be the same length as each array returned by {@link #resultsAsArrays}. diff --git a/processing/src/main/java/org/apache/druid/query/RetryQueryRunner.java b/processing/src/main/java/org/apache/druid/query/RetryQueryRunner.java index 6b991b870575..fa337d047899 100644 --- a/processing/src/main/java/org/apache/druid/query/RetryQueryRunner.java +++ b/processing/src/main/java/org/apache/druid/query/RetryQueryRunner.java @@ -30,7 +30,6 @@ import org.apache.druid.java.util.common.guava.YieldingSequenceBase; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.query.context.ResponseContext; -import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; import org.apache.druid.segment.SegmentMissingException; import java.util.ArrayList; @@ -73,10 +72,8 @@ public Yielder toYielder(OutType initValue, YieldingAccumulat log.info("[%,d] missing segments found. Retry attempt [%,d]", missingSegments.size(), i); context.put(ResponseContext.Key.MISSING_SEGMENTS, new ArrayList<>()); - final QueryPlus retryQueryPlus = queryPlus.withQuerySegmentSpec( - new MultipleSpecificSegmentSpec( - missingSegments - ) + final QueryPlus retryQueryPlus = queryPlus.withQuery( + Queries.withSpecificSegments(queryPlus.getQuery(), missingSegments) ); Sequence retrySequence = baseRunner.run(retryQueryPlus, context); listOfSequences.add(retrySequence); diff --git a/processing/src/main/java/org/apache/druid/query/TimewarpOperator.java b/processing/src/main/java/org/apache/druid/query/TimewarpOperator.java index 0beca6849a4f..88c88b324890 100644 --- a/processing/src/main/java/org/apache/druid/query/TimewarpOperator.java +++ b/processing/src/main/java/org/apache/druid/query/TimewarpOperator.java @@ -92,8 +92,11 @@ public Sequence run(final QueryPlus queryPlus, final ResponseContext respo ); return Sequences.map( baseRunner.run( - queryPlus.withQuerySegmentSpec(new MultipleIntervalSegmentSpec( - Collections.singletonList(modifiedInterval))), + queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(modifiedInterval)) + ) + ), responseContext ), new Function() diff --git a/processing/src/main/java/org/apache/druid/query/filter/DimFilterUtils.java b/processing/src/main/java/org/apache/druid/query/filter/DimFilterUtils.java index 03966e944445..00a84fc92d6d 100644 --- a/processing/src/main/java/org/apache/druid/query/filter/DimFilterUtils.java +++ b/processing/src/main/java/org/apache/druid/query/filter/DimFilterUtils.java @@ -20,7 +20,6 @@ package org.apache.druid.query.filter; import com.google.common.base.Function; -import com.google.common.base.Optional; import com.google.common.collect.RangeSet; import org.apache.druid.timeline.partition.ShardSpec; @@ -29,9 +28,11 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; /** + * */ public class DimFilterUtils { @@ -87,14 +88,15 @@ static byte[] computeCacheKey(byte cacheIdKey, List filters) * {@link #filterShards(DimFilter, Iterable, Function, Map)} instead with a cached map * * @param dimFilter The filter to use - * @param input The iterable of objects to be filtered + * @param input The iterable of objects to be filtered * @param converter The function to convert T to ShardSpec that can be filtered by - * @param This can be any type, as long as transform function is provided to convert this to ShardSpec + * @param This can be any type, as long as transform function is provided to convert this to ShardSpec + * * @return The set of filtered object, in the same order as input */ public static Set filterShards(DimFilter dimFilter, Iterable input, Function converter) { - return filterShards(dimFilter, input, converter, new HashMap>>()); + return filterShards(dimFilter, input, converter, new HashMap<>()); } /** @@ -106,15 +108,20 @@ public static Set filterShards(DimFilter dimFilter, Iterable input, Fu * between calls with the same dimFilter to save redundant calls of {@link DimFilter#getDimensionRangeSet(String)} * on same dimensions. * - * @param dimFilter The filter to use - * @param input The iterable of objects to be filtered - * @param converter The function to convert T to ShardSpec that can be filtered by + * @param dimFilter The filter to use + * @param input The iterable of objects to be filtered + * @param converter The function to convert T to ShardSpec that can be filtered by * @param dimensionRangeCache The cache of RangeSets of different dimensions for the dimFilter - * @param This can be any type, as long as transform function is provided to convert this to ShardSpec + * @param This can be any type, as long as transform function is provided to convert this to ShardSpec + * * @return The set of filtered object, in the same order as input */ - public static Set filterShards(DimFilter dimFilter, Iterable input, Function converter, - Map>> dimensionRangeCache) + public static Set filterShards( + final DimFilter dimFilter, + final Iterable input, + final Function converter, + final Map>> dimensionRangeCache + ) { Set retSet = new LinkedHashSet<>(); @@ -127,7 +134,7 @@ public static Set filterShards(DimFilter dimFilter, Iterable input, Fu List dimensions = shard.getDomainDimensions(); for (String dimension : dimensions) { Optional> optFilterRangeSet = dimensionRangeCache - .computeIfAbsent(dimension, d -> Optional.fromNullable(dimFilter.getDimensionRangeSet(d))); + .computeIfAbsent(dimension, d -> Optional.ofNullable(dimFilter.getDimensionRangeSet(d))); if (optFilterRangeSet.isPresent()) { filterDomain.put(dimension, optFilterRangeSet.get()); diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java index acfe77688e6a..ebae09f184f2 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -663,6 +663,26 @@ public ResultRow apply(Object input) }; } + @Override + public boolean canPerformSubquery(Query subquery) + { + Query current = subquery; + + while (current != null) { + if (!(current instanceof GroupByQuery)) { + return false; + } + + if (current.getDataSource() instanceof QueryDataSource) { + current = ((QueryDataSource) current.getDataSource()).getQuery(); + } else { + current = null; + } + } + + return true; + } + @Override public List resultArrayFields(final GroupByQuery query) { diff --git a/processing/src/main/java/org/apache/druid/query/spec/MultipleSpecificSegmentSpec.java b/processing/src/main/java/org/apache/druid/query/spec/MultipleSpecificSegmentSpec.java index 34a458d2aaf3..5d0c853823ad 100644 --- a/processing/src/main/java/org/apache/druid/query/spec/MultipleSpecificSegmentSpec.java +++ b/processing/src/main/java/org/apache/druid/query/spec/MultipleSpecificSegmentSpec.java @@ -30,6 +30,7 @@ import org.joda.time.Interval; import java.util.List; +import java.util.Objects; /** */ @@ -93,24 +94,13 @@ public boolean equals(Object o) if (o == null || getClass() != o.getClass()) { return false; } - MultipleSpecificSegmentSpec that = (MultipleSpecificSegmentSpec) o; - - if (descriptors != null ? !descriptors.equals(that.descriptors) : that.descriptors != null) { - return false; - } - if (intervals != null ? !intervals.equals(that.intervals) : that.intervals != null) { - return false; - } - - return true; + return Objects.equals(descriptors, that.descriptors); } @Override public int hashCode() { - int result = descriptors != null ? descriptors.hashCode() : 0; - result = 31 * result + (intervals != null ? intervals.hashCode() : 0); - return result; + return Objects.hash(descriptors); } } diff --git a/processing/src/main/java/org/apache/druid/query/spec/SpecificSegmentQueryRunner.java b/processing/src/main/java/org/apache/druid/query/spec/SpecificSegmentQueryRunner.java index 625f0325229e..0cea0dbf325a 100644 --- a/processing/src/main/java/org/apache/druid/query/spec/SpecificSegmentQueryRunner.java +++ b/processing/src/main/java/org/apache/druid/query/spec/SpecificSegmentQueryRunner.java @@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.guava.Yielder; import org.apache.druid.java.util.common.guava.Yielders; import org.apache.druid.java.util.common.guava.YieldingAccumulator; +import org.apache.druid.query.Queries; import org.apache.druid.query.Query; import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunner; @@ -38,6 +39,7 @@ import java.util.Collections; /** + * */ public class SpecificSegmentQueryRunner implements QueryRunner { @@ -56,7 +58,13 @@ public SpecificSegmentQueryRunner( @Override public Sequence run(final QueryPlus input, final ResponseContext responseContext) { - final QueryPlus queryPlus = input.withQuerySegmentSpec(specificSpec); + final QueryPlus queryPlus = input.withQuery( + Queries.withSpecificSegments( + input.getQuery(), + Collections.singletonList(specificSpec.getDescriptor()) + ) + ); + final Query query = queryPlus.getQuery(); final Thread currThread = Thread.currentThread(); diff --git a/processing/src/test/java/org/apache/druid/query/QueriesTest.java b/processing/src/test/java/org/apache/druid/query/QueriesTest.java index fd5abf238297..16c4783619f8 100644 --- a/processing/src/test/java/org/apache/druid/query/QueriesTest.java +++ b/processing/src/test/java/org/apache/druid/query/QueriesTest.java @@ -20,6 +20,8 @@ package org.apache.druid.query; import com.google.common.collect.ImmutableList; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; @@ -27,17 +29,26 @@ import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator; import org.apache.druid.query.aggregation.post.ConstantPostAggregator; import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; +import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; +import org.apache.druid.query.timeseries.TimeseriesQuery; +import org.apache.druid.query.timeseries.TimeseriesResultValue; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.util.Arrays; import java.util.Collections; import java.util.List; /** + * */ public class QueriesTest { + @Rule + public ExpectedException expectedException = ExpectedException.none(); + @Test public void testVerifyAggregations() { @@ -209,4 +220,114 @@ public void testVerifyAggregationsMultiLevelMissingVal() Assert.assertTrue(exceptionOccured); } + + @Test + public void testWithSpecificSegmentsBasic() + { + final ImmutableList descriptors = ImmutableList.of( + new SegmentDescriptor(Intervals.of("2000/3000"), "0", 0), + new SegmentDescriptor(Intervals.of("2000/3000"), "0", 1) + ); + + Assert.assertEquals( + Druids.newTimeseriesQueryBuilder() + .dataSource("foo") + .intervals( + new MultipleSpecificSegmentSpec( + ImmutableList.of( + new SegmentDescriptor(Intervals.of("2000/3000"), "0", 0), + new SegmentDescriptor(Intervals.of("2000/3000"), "0", 1) + ) + ) + ) + .granularity(Granularities.ALL) + .build(), + Queries.withSpecificSegments( + Druids.newTimeseriesQueryBuilder() + .dataSource("foo") + .intervals("2000/3000") + .granularity(Granularities.ALL) + .build(), + descriptors + ) + ); + } + + @Test + public void testWithSpecificSegmentsSubQueryStack() + { + final ImmutableList descriptors = ImmutableList.of( + new SegmentDescriptor(Intervals.of("2000/3000"), "0", 0), + new SegmentDescriptor(Intervals.of("2000/3000"), "0", 1) + ); + + Assert.assertEquals( + Druids.newTimeseriesQueryBuilder() + .dataSource( + new QueryDataSource( + Druids.newTimeseriesQueryBuilder() + .dataSource( + new QueryDataSource( + Druids.newTimeseriesQueryBuilder() + .dataSource("foo") + .intervals(new MultipleSpecificSegmentSpec(descriptors)) + .granularity(Granularities.ALL) + .build() + ) + ) + .intervals("2000/3000") + .granularity(Granularities.ALL) + .build() + ) + ) + .intervals("2000/3000") + .granularity(Granularities.ALL) + .build(), + Queries.withSpecificSegments( + Druids.newTimeseriesQueryBuilder() + .dataSource( + new QueryDataSource( + Druids.newTimeseriesQueryBuilder() + .dataSource( + new QueryDataSource( + Druids.newTimeseriesQueryBuilder() + .dataSource("foo") + .intervals("2000/3000") + .granularity(Granularities.ALL) + .build() + ) + ) + .intervals("2000/3000") + .granularity(Granularities.ALL) + .build() + ) + ) + .intervals("2000/3000") + .granularity(Granularities.ALL) + .build(), + descriptors + ) + ); + } + + @Test + public void testWithSpecificSegmentsOnUnionIsAnError() + { + final ImmutableList descriptors = ImmutableList.of( + new SegmentDescriptor(Intervals.of("2000/3000"), "0", 0), + new SegmentDescriptor(Intervals.of("2000/3000"), "0", 1) + ); + + final TimeseriesQuery query = + Druids.newTimeseriesQueryBuilder() + .dataSource(new LookupDataSource("lookyloo")) + .intervals("2000/3000") + .granularity(Granularities.ALL) + .build(); + + expectedException.expect(IllegalStateException.class); + expectedException.expectMessage("Unable to apply specific segments to non-table-based dataSource"); + + final Query> ignored = Queries.withSpecificSegments(query, descriptors); + } } diff --git a/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java index 04f86d8ad249..474f7963c58d 100644 --- a/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java +++ b/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java @@ -471,12 +471,14 @@ public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) segments )) { Segment segment = holder.getObject().getChunk(0).getObject(); - QueryPlus queryPlusRunning = queryPlus.withQuerySegmentSpec( - new SpecificSegmentSpec( - new SegmentDescriptor( - holder.getInterval(), - holder.getVersion(), - 0 + QueryPlus queryPlusRunning = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new SpecificSegmentSpec( + new SegmentDescriptor( + holder.getInterval(), + holder.getVersion(), + 0 + ) ) ) ); diff --git a/processing/src/test/java/org/apache/druid/query/filter/DimFilterUtilsTest.java b/processing/src/test/java/org/apache/druid/query/filter/DimFilterUtilsTest.java index b8fa4c2b3112..778db21b0a3e 100644 --- a/processing/src/test/java/org/apache/druid/query/filter/DimFilterUtilsTest.java +++ b/processing/src/test/java/org/apache/druid/query/filter/DimFilterUtilsTest.java @@ -20,7 +20,6 @@ package org.apache.druid.query.filter; import com.google.common.base.Function; -import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableRangeSet; import com.google.common.collect.ImmutableSet; @@ -36,6 +35,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; public class DimFilterUtilsTest diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java index ef112f408c52..bdb771c01273 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java @@ -31,6 +31,8 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.query.CacheStrategy; +import org.apache.druid.query.Druids; +import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.QueryRunnerTestHelper; import org.apache.druid.query.QueryToolChestTestHelper; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -757,6 +759,64 @@ public void testResultsAsArraysDayGran() ); } + @Test + public void testCanPerformSubqueryOnGroupBys() + { + Assert.assertTrue( + new GroupByQueryQueryToolChest(null, null).canPerformSubquery( + new GroupByQuery.Builder() + .setDataSource( + new QueryDataSource( + new GroupByQuery.Builder() + .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) + .setInterval(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC) + .setGranularity(Granularities.ALL) + .build() + ) + ) + .setInterval(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC) + .setGranularity(Granularities.ALL) + .build() + ) + ); + } + + @Test + public void testCanPerformSubqueryOnTimeseries() + { + Assert.assertFalse( + new GroupByQueryQueryToolChest(null, null).canPerformSubquery( + Druids.newTimeseriesQueryBuilder() + .dataSource(QueryRunnerTestHelper.DATA_SOURCE) + .intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC) + .granularity(Granularities.ALL) + .build() + ) + ); + } + + @Test + public void testCanPerformSubqueryOnGroupByOfTimeseries() + { + Assert.assertFalse( + new GroupByQueryQueryToolChest(null, null).canPerformSubquery( + new GroupByQuery.Builder() + .setDataSource( + new QueryDataSource( + Druids.newTimeseriesQueryBuilder() + .dataSource(QueryRunnerTestHelper.DATA_SOURCE) + .intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC) + .granularity(Granularities.ALL) + .build() + ) + ) + .setInterval(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC) + .setGranularity(Granularities.ALL) + .build() + ) + ); + } + private AggregatorFactory getComplexAggregatorFactoryForValueType(final ValueType valueType) { switch (valueType) { diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index b8662e01f1df..037b2b75a973 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -2983,11 +2983,15 @@ public void testMergeResults() public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) { // simulate two daily segments - final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + final QueryPlus queryPlus1 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + ) ); - final QueryPlus queryPlus2 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + final QueryPlus queryPlus2 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + ) ); return new MergeSequence( queryPlus.getQuery().getResultOrdering(), @@ -3326,11 +3330,15 @@ private void doTestMergeResultsWithOrderBy( public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) { // simulate two daily segments - final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + final QueryPlus queryPlus1 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + ) ); - final QueryPlus queryPlus2 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + final QueryPlus queryPlus2 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + ) ); return new MergeSequence( queryPlus.getQuery().getResultOrdering(), @@ -4083,11 +4091,15 @@ public void testPostAggMergedHavingSpec() public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) { // simulate two daily segments - final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + final QueryPlus queryPlus1 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + ) ); - final QueryPlus queryPlus2 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + final QueryPlus queryPlus2 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + ) ); return new MergeSequence( queryPlus.getQuery().getResultOrdering(), @@ -4349,11 +4361,15 @@ public void testMergedHavingSpec() public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) { // simulate two daily segments - final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + final QueryPlus queryPlus1 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + ) ); - final QueryPlus queryPlus2 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + final QueryPlus queryPlus2 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + ) ); return new MergeSequence( queryPlus.getQuery().getResultOrdering(), @@ -4426,11 +4442,15 @@ public void testMergedPostAggHavingSpec() public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) { // simulate two daily segments - final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + final QueryPlus queryPlus1 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + ) ); - final QueryPlus queryPlus2 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + final QueryPlus queryPlus2 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + ) ); return new MergeSequence( queryPlus.getQuery().getResultOrdering(), @@ -9968,11 +9988,15 @@ public void testMergeResultsWithLimitPushDown() public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) { // simulate two daily segments - final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + final QueryPlus queryPlus1 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + ) ); - final QueryPlus queryPlus2 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + final QueryPlus queryPlus2 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + ) ); return factory.getToolchest().mergeResults( @@ -10034,11 +10058,15 @@ public void testMergeResultsWithLimitPushDownSortByAgg() public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) { // simulate two daily segments - final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + final QueryPlus queryPlus1 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + ) ); - final QueryPlus queryPlus2 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + final QueryPlus queryPlus2 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + ) ); return factory.getToolchest().mergeResults( @@ -10102,11 +10130,15 @@ public void testMergeResultsWithLimitPushDownSortByDimDim() public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) { // simulate two daily segments - final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + final QueryPlus queryPlus1 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + ) ); - final QueryPlus queryPlus2 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + final QueryPlus queryPlus2 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + ) ); return factory.getToolchest().mergeResults( @@ -10183,11 +10215,15 @@ public void testMergeResultsWithLimitPushDownSortByDimAggDim() public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) { // simulate two daily segments - final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + final QueryPlus queryPlus1 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + ) ); - final QueryPlus queryPlus2 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + final QueryPlus queryPlus2 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + ) ); return factory.getToolchest().mergeResults( diff --git a/processing/src/test/java/org/apache/druid/query/search/SearchQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/search/SearchQueryRunnerTest.java index 6b503a541116..aebcf257b3e5 100644 --- a/processing/src/test/java/org/apache/druid/query/search/SearchQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/search/SearchQueryRunnerTest.java @@ -72,6 +72,7 @@ import java.util.List; /** + * */ @RunWith(Parameterized.class) public class SearchQueryRunnerTest extends InitializedNullHandlingTest @@ -167,11 +168,15 @@ public Sequence> run( ResponseContext responseContext ) { - final QueryPlus> queryPlus1 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-01-12/2011-02-28"))) + final QueryPlus> queryPlus1 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-01-12/2011-02-28"))) + ) ); - final QueryPlus> queryPlus2 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-03-01/2011-04-15"))) + final QueryPlus> queryPlus2 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-03-01/2011-04-15"))) + ) ); return Sequences.concat(runner.run(queryPlus1, responseContext), runner.run(queryPlus2, responseContext)); } diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java b/server/src/main/java/org/apache/druid/client/BrokerServerView.java index 4b365b7ce89e..5bf4ee9c911e 100644 --- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java +++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Predicate; -import com.google.common.collect.Iterables; import com.google.common.collect.Ordering; import com.google.inject.Inject; import org.apache.druid.client.selector.QueryableDruidServer; @@ -30,26 +29,28 @@ import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.guice.annotations.EscalatedClient; import org.apache.druid.guice.annotations.Smile; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.http.client.HttpClient; -import org.apache.druid.query.DataSource; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryToolChestWarehouse; import org.apache.druid.query.QueryWatcher; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.PartitionChunk; -import javax.annotation.Nullable; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; @@ -289,14 +290,15 @@ private void serverRemovedSegment(DruidServerMetadata server, DataSegment segmen } } - - @Nullable @Override - public VersionedIntervalTimeline getTimeline(DataSource dataSource) + public Optional> getTimeline(final DataSourceAnalysis analysis) { - String table = Iterables.getOnlyElement(dataSource.getTableNames()); + final TableDataSource tableDataSource = + analysis.getBaseTableDataSource() + .orElseThrow(() -> new ISE("Cannot handle datasource: %s", analysis.getDataSource())); + synchronized (lock) { - return timelines.get(table); + return Optional.ofNullable(timelines.get(tableDataSource.getName())); } } diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java index 9fbc354109cb..ba8b3ec09523 100644 --- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; -import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; @@ -53,6 +52,7 @@ import org.apache.druid.query.BySegmentResultValueClass; import org.apache.druid.query.CacheStrategy; import org.apache.druid.query.DruidProcessingConfig; +import org.apache.druid.query.Queries; import org.apache.druid.query.Query; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryMetrics; @@ -66,7 +66,8 @@ import org.apache.druid.query.aggregation.MetricManipulatorFns; import org.apache.druid.query.context.ResponseContext; import org.apache.druid.query.filter.DimFilterUtils; -import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; +import org.apache.druid.query.planning.DataSourceAnalysis; +import org.apache.druid.query.spec.QuerySegmentSpec; import org.apache.druid.server.QueryResource; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.timeline.DataSegment; @@ -88,6 +89,7 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; @@ -97,6 +99,7 @@ import java.util.stream.Collectors; /** + * */ public class CachingClusteredClient implements QuerySegmentWalker { @@ -231,6 +234,7 @@ private class SpecificQueryRunnable private final int uncoveredIntervalsLimit; private final Query downstreamQuery; private final Map cachePopulatorKeyMap = new HashMap<>(); + private final DataSourceAnalysis dataSourceAnalysis; private final List intervals; SpecificQueryRunnable(final QueryPlus queryPlus, final ResponseContext responseContext) @@ -248,8 +252,11 @@ private class SpecificQueryRunnable // and might blow up in some cases https://github.com/apache/druid/issues/2108 this.uncoveredIntervalsLimit = QueryContexts.getUncoveredIntervalsLimit(query); this.downstreamQuery = query.withOverriddenContext(makeDownstreamQueryContext()); + this.dataSourceAnalysis = DataSourceAnalysis.forDataSource(query.getDataSource()); // For nested queries, we need to look at the intervals of the inner most query. - this.intervals = query.getIntervalsOfInnerMostQuery(); + this.intervals = dataSourceAnalysis.getBaseQuerySegmentSpec() + .map(QuerySegmentSpec::getIntervals) + .orElse(query.getIntervals()); } private ImmutableMap makeDownstreamQueryContext() @@ -269,12 +276,14 @@ private ImmutableMap makeDownstreamQueryContext() Sequence run(final UnaryOperator> timelineConverter) { - @Nullable - TimelineLookup timeline = serverView.getTimeline(query.getDataSource()); - if (timeline == null) { + final Optional> maybeTimeline = serverView.getTimeline( + dataSourceAnalysis + ); + if (!maybeTimeline.isPresent()) { return Sequences.empty(); } - timeline = timelineConverter.apply(timeline); + + final TimelineLookup timeline = timelineConverter.apply(maybeTimeline.get()); if (uncoveredIntervalsLimit > 0) { computeUncoveredIntervals(timeline); } @@ -598,19 +607,17 @@ private void addSequencesFromServer( return; } - final MultipleSpecificSegmentSpec segmentsOfServerSpec = new MultipleSpecificSegmentSpec(segmentsOfServer); - // Divide user-provided maxQueuedBytes by the number of servers, and limit each server to that much. final long maxQueuedBytes = QueryContexts.getMaxQueuedBytes(query, httpClientConfig.getMaxQueuedBytes()); final long maxQueuedBytesPerServer = maxQueuedBytes / segmentsByServer.size(); final Sequence serverResults; if (isBySegment) { - serverResults = getBySegmentServerResults(serverRunner, segmentsOfServerSpec, maxQueuedBytesPerServer); + serverResults = getBySegmentServerResults(serverRunner, segmentsOfServer, maxQueuedBytesPerServer); } else if (!server.segmentReplicatable() || !populateCache) { - serverResults = getSimpleServerResults(serverRunner, segmentsOfServerSpec, maxQueuedBytesPerServer); + serverResults = getSimpleServerResults(serverRunner, segmentsOfServer, maxQueuedBytesPerServer); } else { - serverResults = getAndCacheServerResults(serverRunner, segmentsOfServerSpec, maxQueuedBytesPerServer); + serverResults = getAndCacheServerResults(serverRunner, segmentsOfServer, maxQueuedBytesPerServer); } listOfSequences.add(serverResults); }); @@ -619,13 +626,15 @@ private void addSequencesFromServer( @SuppressWarnings("unchecked") private Sequence getBySegmentServerResults( final QueryRunner serverRunner, - final MultipleSpecificSegmentSpec segmentsOfServerSpec, + final List segmentsOfServer, long maxQueuedBytesPerServer ) { Sequence>> resultsBySegments = serverRunner .run( - queryPlus.withQuerySegmentSpec(segmentsOfServerSpec).withMaxQueuedBytes(maxQueuedBytesPerServer), + queryPlus.withQuery( + Queries.withSpecificSegments(queryPlus.getQuery(), segmentsOfServer) + ).withMaxQueuedBytes(maxQueuedBytesPerServer), responseContext ); // bySegment results need to be de-serialized, see DirectDruidClient.run() @@ -640,27 +649,33 @@ private Sequence getBySegmentServerResults( @SuppressWarnings("unchecked") private Sequence getSimpleServerResults( final QueryRunner serverRunner, - final MultipleSpecificSegmentSpec segmentsOfServerSpec, + final List segmentsOfServer, long maxQueuedBytesPerServer ) { return serverRunner.run( - queryPlus.withQuerySegmentSpec(segmentsOfServerSpec).withMaxQueuedBytes(maxQueuedBytesPerServer), + queryPlus.withQuery( + Queries.withSpecificSegments(queryPlus.getQuery(), segmentsOfServer) + ).withMaxQueuedBytes(maxQueuedBytesPerServer), responseContext ); } private Sequence getAndCacheServerResults( final QueryRunner serverRunner, - final MultipleSpecificSegmentSpec segmentsOfServerSpec, + final List segmentsOfServer, long maxQueuedBytesPerServer ) { @SuppressWarnings("unchecked") final Sequence>> resultsBySegments = serverRunner.run( queryPlus - .withQuery((Query>>) downstreamQuery) - .withQuerySegmentSpec(segmentsOfServerSpec) + .withQuery( + Queries.withSpecificSegments( + (Query>>) downstreamQuery, + segmentsOfServer + ) + ) .withMaxQueuedBytes(maxQueuedBytesPerServer), responseContext ); diff --git a/server/src/main/java/org/apache/druid/client/ServerViewUtil.java b/server/src/main/java/org/apache/druid/client/ServerViewUtil.java index 8a52cacf2995..b9f1f91a5d35 100644 --- a/server/src/main/java/org/apache/druid/client/ServerViewUtil.java +++ b/server/src/main/java/org/apache/druid/client/ServerViewUtil.java @@ -24,6 +24,7 @@ import org.apache.druid.query.LocatedSegmentDescriptor; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.timeline.TimelineLookup; import org.apache.druid.timeline.TimelineObjectHolder; @@ -33,6 +34,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Optional; /** */ @@ -55,13 +57,14 @@ public static List getTargetLocations( int numCandidates ) { - TimelineLookup timeline = serverView.getTimeline(datasource); - if (timeline == null) { + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(datasource); + final Optional> maybeTimeline = serverView.getTimeline(analysis); + if (!maybeTimeline.isPresent()) { return Collections.emptyList(); } List located = new ArrayList<>(); for (Interval interval : intervals) { - for (TimelineObjectHolder holder : timeline.lookup(interval)) { + for (TimelineObjectHolder holder : maybeTimeline.get().lookup(interval)) { for (PartitionChunk chunk : holder.getObject()) { ServerSelector selector = chunk.getObject(); final SegmentDescriptor descriptor = new SegmentDescriptor( diff --git a/server/src/main/java/org/apache/druid/client/TimelineServerView.java b/server/src/main/java/org/apache/druid/client/TimelineServerView.java index ed1d4dfb7313..477882342425 100644 --- a/server/src/main/java/org/apache/druid/client/TimelineServerView.java +++ b/server/src/main/java/org/apache/druid/client/TimelineServerView.java @@ -20,22 +20,31 @@ package org.apache.druid.client; import org.apache.druid.client.selector.ServerSelector; -import org.apache.druid.query.DataSource; import org.apache.druid.query.QueryRunner; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.TimelineLookup; -import javax.annotation.Nullable; import java.util.List; +import java.util.Optional; import java.util.concurrent.Executor; /** */ public interface TimelineServerView extends ServerView { - @Nullable - TimelineLookup getTimeline(DataSource dataSource); + /** + * Returns the timeline for a datasource, if it exists. The analysis object passed in must represent a scan-based + * datasource of a single table. + * + * @param analysis data source analysis information + * + * @return timeline, if it exists + * + * @throws IllegalStateException if 'analysis' does not represent a scan-based datasource of a single table + */ + Optional> getTimeline(DataSourceAnalysis analysis); /** * Returns a list of {@link ImmutableDruidServer} diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index 451e079b26b4..fb7b8067008f 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -20,7 +20,6 @@ package org.apache.druid.segment.realtime.appenderator; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; import org.apache.druid.client.CachingQueryRunner; @@ -42,6 +41,7 @@ import org.apache.druid.query.MetricsEmittingQueryRunner; import org.apache.druid.query.NoopQueryRunner; import org.apache.druid.query.Query; +import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.QueryMetrics; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactory; @@ -60,16 +60,19 @@ import org.apache.druid.segment.realtime.FireHydrant; import org.apache.druid.segment.realtime.plumber.Sink; import org.apache.druid.timeline.SegmentId; -import org.apache.druid.timeline.TimelineObjectHolder; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.PartitionChunk; import org.apache.druid.timeline.partition.PartitionHolder; import org.joda.time.Interval; import java.io.Closeable; +import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicLong; +/** + * Query handler for indexing tasks. + */ public class SinkQuerySegmentWalker implements QuerySegmentWalker { private static final EmittingLogger log = new EmittingLogger(SinkQuerySegmentWalker.class); @@ -118,40 +121,17 @@ public QueryRunner getQueryRunnerForIntervals(final Query query, final { final Iterable specs = FunctionalIterable .create(intervals) + .transformCat(sinkTimeline::lookup) .transformCat( - new Function>>() - { - @Override - public Iterable> apply(final Interval interval) - { - return sinkTimeline.lookup(interval); - } - } - ) - .transformCat( - new Function, Iterable>() - { - @Override - public Iterable apply(final TimelineObjectHolder holder) - { - return FunctionalIterable - .create(holder.getObject()) - .transform( - new Function, SegmentDescriptor>() - { - @Override - public SegmentDescriptor apply(final PartitionChunk chunk) - { - return new SegmentDescriptor( - holder.getInterval(), - holder.getVersion(), - chunk.getChunkNumber() - ); - } - } - ); - } - } + holder -> FunctionalIterable + .create(holder.getObject()) + .transform( + chunk -> new SegmentDescriptor( + holder.getInterval(), + holder.getVersion(), + chunk.getChunkNumber() + ) + ) ); return getQueryRunnerForSegments(query, specs); @@ -161,16 +141,15 @@ public SegmentDescriptor apply(final PartitionChunk chunk) public QueryRunner getQueryRunnerForSegments(final Query query, final Iterable specs) { // We only handle one particular dataSource. Make sure that's what we have, then ignore from here on out. - if (!(query.getDataSource() instanceof TableDataSource) - || !dataSource.equals(((TableDataSource) query.getDataSource()).getName())) { - log.makeAlert("Received query for unknown dataSource") - .addData("dataSource", query.getDataSource()) - .emit(); - return new NoopQueryRunner<>(); + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource()); + final Optional baseTableDataSource = analysis.getBaseTableDataSource(); + + if (!baseTableDataSource.isPresent() || !dataSource.equals(baseTableDataSource.get().getName())) { + // Report error, since we somehow got a query for a datasource we can't handle. + throw new ISE("Cannot handle datasource: %s", analysis.getDataSource()); } // Sanity check: we cannot actually handle joins yet, so detect them and throw an error. - final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource()); if (!analysis.getPreJoinableClauses().isEmpty()) { throw new ISE("Cannot handle join dataSource"); } @@ -184,6 +163,11 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final final boolean skipIncrementalSegment = query.getContextValue(CONTEXT_SKIP_INCREMENTAL_SEGMENT, false); final AtomicLong cpuTimeAccumulator = new AtomicLong(0L); + // Make sure this query type can handle the subquery, if present. + if (analysis.isQuery() && !toolChest.canPerformSubquery(((QueryDataSource) analysis.getDataSource()).getQuery())) { + throw new ISE("Cannot handle subquery: %s", analysis.getDataSource()); + } + Iterable> perSegmentRunners = Iterables.transform( specs, descriptor -> { diff --git a/server/src/main/java/org/apache/druid/server/ClientInfoResource.java b/server/src/main/java/org/apache/druid/server/ClientInfoResource.java index 24f6c4435d2f..08032a5efe39 100644 --- a/server/src/main/java/org/apache/druid/server/ClientInfoResource.java +++ b/server/src/main/java/org/apache/druid/server/ClientInfoResource.java @@ -37,6 +37,7 @@ import org.apache.druid.query.LocatedSegmentDescriptor; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.metadata.SegmentMetadataQueryConfig; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.server.http.security.DatasourceResourceFilter; import org.apache.druid.server.security.AuthConfig; import org.apache.druid.server.security.AuthorizationUtils; @@ -64,12 +65,14 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.TreeMap; import java.util.stream.Collectors; import java.util.stream.Stream; /** + * */ @Path("/druid/v2/datasources") public class ClientInfoResource @@ -152,12 +155,12 @@ KEY_METRICS, getDataSourceMetrics(dataSourceName, interval) theInterval = Intervals.of(interval); } - TimelineLookup timeline = timelineServerView.getTimeline(new TableDataSource(dataSourceName)); - Iterable> serversLookup = timeline != null ? timeline.lookup( - theInterval - ) : null; - if (serversLookup == null || Iterables.isEmpty(serversLookup)) { - return Collections.EMPTY_MAP; + final Optional> maybeTimeline = + timelineServerView.getTimeline(DataSourceAnalysis.forDataSource(new TableDataSource(dataSourceName))); + final Optional>> maybeServersLookup = + maybeTimeline.map(timeline -> timeline.lookup(theInterval)); + if (!maybeServersLookup.isPresent() || Iterables.isEmpty(maybeServersLookup.get())) { + return Collections.emptyMap(); } Map servedIntervals = new TreeMap<>( new Comparator() @@ -174,7 +177,7 @@ public int compare(Interval o1, Interval o2) } ); - for (TimelineObjectHolder holder : serversLookup) { + for (TimelineObjectHolder holder : maybeServersLookup.get()) { final Set dimensions = new HashSet<>(); final Set metrics = new HashSet<>(); final PartitionHolder partitionHolder = holder.getObject(); diff --git a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java index ddc15aedef07..b484486c03bf 100644 --- a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java @@ -43,7 +43,7 @@ import org.joda.time.Interval; /** - * + * Query handler for Broker processes (see CliBroker). */ public class ClientQuerySegmentWalker implements QuerySegmentWalker { @@ -56,7 +56,6 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker private final Cache cache; private final CacheConfig cacheConfig; - @Inject public ClientQuerySegmentWalker( ServiceEmitter emitter, @@ -82,25 +81,27 @@ public ClientQuerySegmentWalker( @Override public QueryRunner getQueryRunnerForIntervals(Query query, Iterable intervals) { - // Sanity check: we cannot actually handle joins yet, so detect them and throw an error. final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource()); - if (!analysis.getPreJoinableClauses().isEmpty()) { - throw new ISE("Cannot handle join dataSource"); - } - return makeRunner(query, baseClient.getQueryRunnerForIntervals(query, intervals)); + if (analysis.isConcreteTableBased()) { + return makeRunner(query, baseClient.getQueryRunnerForIntervals(query, intervals)); + } else { + // In the future, we will check here to see if parts of the query are inlinable, and if that inlining would + // be able to create a concrete table-based query that we can run through the distributed query stack. + throw new ISE("Query dataSource is not table-based, cannot run"); + } } @Override public QueryRunner getQueryRunnerForSegments(Query query, Iterable specs) { - // Sanity check: we cannot actually handle joins yet, so detect them and throw an error. final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource()); - if (!analysis.getPreJoinableClauses().isEmpty()) { - throw new ISE("Cannot handle join dataSource"); - } - return makeRunner(query, baseClient.getQueryRunnerForSegments(query, specs)); + if (analysis.isConcreteTableBased()) { + return makeRunner(query, baseClient.getQueryRunnerForSegments(query, specs)); + } else { + throw new ISE("Query dataSource is not table-based, cannot run"); + } } private QueryRunner makeRunner(Query query, QueryRunner baseClientRunner) @@ -126,9 +127,7 @@ private QueryRunner makeRunner( { PostProcessingOperator postProcessing = objectMapper.convertValue( query.getContextValue("postProcessing"), - new TypeReference>() - { - } + new TypeReference>() {} ); return new FluentQueryRunnerBuilder<>(toolChest) diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java b/server/src/main/java/org/apache/druid/server/SegmentManager.java index dbb48474defc..45b6538bb610 100644 --- a/server/src/main/java/org/apache/druid/server/SegmentManager.java +++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java @@ -23,7 +23,10 @@ import com.google.common.collect.Ordering; import com.google.inject.Inject; import org.apache.druid.common.guava.SettableSupplier; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.Segment; import org.apache.druid.segment.loading.SegmentLoader; @@ -35,8 +38,8 @@ import org.apache.druid.timeline.partition.ShardSpec; import org.apache.druid.utils.CollectionUtils; -import javax.annotation.Nullable; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; /** @@ -134,19 +137,30 @@ public boolean isSegmentCached(final DataSegment segment) return segmentLoader.isSegmentLoaded(segment); } - @Nullable - public VersionedIntervalTimeline getTimeline(String dataSource) + /** + * Returns the timeline for a datasource, if it exists. The analysis object passed in must represent a scan-based + * datasource of a single table. + * + * @param analysis data source analysis information + * + * @return timeline, if it exists + * + * @throws IllegalStateException if 'analysis' does not represent a scan-based datasource of a single table + */ + public Optional> getTimeline(DataSourceAnalysis analysis) { - final DataSourceState dataSourceState = dataSources.get(dataSource); - return dataSourceState == null ? null : dataSourceState.getTimeline(); + final TableDataSource tableDataSource = + analysis.getBaseTableDataSource() + .orElseThrow(() -> new ISE("Cannot handle datasource: %s", analysis.getDataSource())); + + return Optional.ofNullable(dataSources.get(tableDataSource.getName())).map(DataSourceState::getTimeline); } /** * Load a single segment. * * @param segment segment to load - * - * @param lazy whether to lazy load columns metadata + * @param lazy whether to lazy load columns metadata * * @return true if the segment was newly loaded, false if it was already loaded * diff --git a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java index 41c71160e5ec..d4c672c91f4b 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java @@ -20,8 +20,6 @@ package org.apache.druid.server.coordination; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Function; -import com.google.common.collect.Iterables; import com.google.inject.Inject; import org.apache.druid.client.CachingQueryRunner; import org.apache.druid.client.cache.Cache; @@ -35,7 +33,6 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.query.BySegmentQueryRunner; import org.apache.druid.query.CPUTimeMetricQueryRunner; -import org.apache.druid.query.DataSource; import org.apache.druid.query.FinalizeResultsQueryRunner; import org.apache.druid.query.MetricsEmittingQueryRunner; import org.apache.druid.query.NoopQueryRunner; @@ -52,7 +49,6 @@ import org.apache.druid.query.ReferenceCountingSegmentQueryRunner; import org.apache.druid.query.ReportTimelineMissingSegmentQueryRunner; import org.apache.druid.query.SegmentDescriptor; -import org.apache.druid.query.TableDataSource; import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.spec.SpecificSegmentQueryRunner; import org.apache.druid.query.spec.SpecificSegmentSpec; @@ -61,17 +57,19 @@ import org.apache.druid.server.SetAndVerifyContextQueryRunner; import org.apache.druid.server.initialization.ServerConfig; import org.apache.druid.timeline.SegmentId; -import org.apache.druid.timeline.TimelineObjectHolder; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.PartitionChunk; import org.apache.druid.timeline.partition.PartitionHolder; import org.joda.time.Interval; -import javax.annotation.Nullable; import java.util.Collections; +import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicLong; +/** + * Query handler for Historical processes (see CliHistorical). + */ public class ServerManager implements QuerySegmentWalker { private static final EmittingLogger log = new EmittingLogger(ServerManager.class); @@ -111,110 +109,49 @@ public ServerManager( this.serverConfig = serverConfig; } - private DataSource getInnerMostDataSource(DataSource dataSource) - { - if (dataSource instanceof QueryDataSource) { - return getInnerMostDataSource(((QueryDataSource) dataSource).getQuery().getDataSource()); - } - return dataSource; - } - @Override public QueryRunner getQueryRunnerForIntervals(Query query, Iterable intervals) { - final QueryRunnerFactory> factory = conglomerate.findFactory(query); - if (factory == null) { - throw new ISE("Unknown query type[%s].", query.getClass()); - } - - // Sanity check: we cannot actually handle joins yet, so detect them and throw an error. final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource()); if (!analysis.getPreJoinableClauses().isEmpty()) { throw new ISE("Cannot handle join dataSource"); } - final QueryToolChest> toolChest = factory.getToolchest(); - final AtomicLong cpuTimeAccumulator = new AtomicLong(0L); - - DataSource dataSource = getInnerMostDataSource(query.getDataSource()); - if (!(dataSource instanceof TableDataSource)) { - throw new UnsupportedOperationException("data source type '" + dataSource.getClass().getName() + "' unsupported"); - } - String dataSourceName = getDataSourceName(dataSource); - - final VersionedIntervalTimeline timeline = segmentManager.getTimeline( - dataSourceName - ); + final VersionedIntervalTimeline timeline; + final Optional> maybeTimeline = + segmentManager.getTimeline(analysis); - if (timeline == null) { - return new NoopQueryRunner(); + if (maybeTimeline.isPresent()) { + timeline = maybeTimeline.get(); + } else { + // Note: this is not correct when there's a right or full outer join going on. + // See https://github.com/apache/druid/issues/9229 for details. + return new NoopQueryRunner<>(); } - FunctionalIterable> queryRunners = FunctionalIterable + FunctionalIterable segmentDescriptors = FunctionalIterable .create(intervals) + .transformCat(timeline::lookup) .transformCat( - new Function>>() - { - @Override - public Iterable> apply(Interval input) - { - return timeline.lookup(input); - } - } - ) - .transformCat( - new Function, Iterable>>() - { - @Override - public Iterable> apply( - @Nullable final TimelineObjectHolder holder - ) - { - if (holder == null) { - return null; - } - - return FunctionalIterable - .create(holder.getObject()) - .transform( - new Function, QueryRunner>() - { - @Override - public QueryRunner apply(PartitionChunk input) - { - return buildAndDecorateQueryRunner( - factory, - toolChest, - input.getObject(), - new SegmentDescriptor( - holder.getInterval(), - holder.getVersion(), - input.getChunkNumber() - ), - cpuTimeAccumulator - ); - } - } - ); + holder -> { + if (holder == null) { + return null; } + + return FunctionalIterable + .create(holder.getObject()) + .transform( + partitionChunk -> + new SegmentDescriptor( + holder.getInterval(), + holder.getVersion(), + partitionChunk.getChunkNumber() + ) + ); } ); - return CPUTimeMetricQueryRunner.safeBuild( - new FinalizeResultsQueryRunner<>( - toolChest.mergeResults(factory.mergeRunners(exec, queryRunners)), - toolChest - ), - toolChest, - emitter, - cpuTimeAccumulator, - true - ); - } - - private String getDataSourceName(DataSource dataSource) - { - return Iterables.getOnlyElement(dataSource.getTableNames()); + return getQueryRunnerForSegments(query, segmentDescriptors); } @Override @@ -225,7 +162,7 @@ public QueryRunner getQueryRunnerForSegments(Query query, Iterable(); + return new NoopQueryRunner<>(); } // Sanity check: we cannot actually handle joins yet, so detect them and throw an error. @@ -235,48 +172,53 @@ public QueryRunner getQueryRunnerForSegments(Query query, Iterable> toolChest = factory.getToolchest(); + final AtomicLong cpuTimeAccumulator = new AtomicLong(0L); - String dataSourceName = getDataSourceName(query.getDataSource()); - - final VersionedIntervalTimeline timeline = segmentManager.getTimeline( - dataSourceName - ); + final VersionedIntervalTimeline timeline; + final Optional> maybeTimeline = + segmentManager.getTimeline(analysis); - if (timeline == null) { - return new NoopQueryRunner(); + // Make sure this query type can handle the subquery, if present. + if (analysis.isQuery() && !toolChest.canPerformSubquery(((QueryDataSource) analysis.getDataSource()).getQuery())) { + throw new ISE("Cannot handle subquery: %s", analysis.getDataSource()); } - final AtomicLong cpuTimeAccumulator = new AtomicLong(0L); + if (maybeTimeline.isPresent()) { + timeline = maybeTimeline.get(); + } else { + // Note: this is not correct when there's a right or full outer join going on. + // See https://github.com/apache/druid/issues/9229 for details. + return new NoopQueryRunner<>(); + } FunctionalIterable> queryRunners = FunctionalIterable .create(specs) .transformCat( - new Function>>() - { - @Override - @SuppressWarnings("unchecked") - public Iterable> apply(SegmentDescriptor input) - { - - final PartitionHolder entry = timeline.findEntry( - input.getInterval(), input.getVersion() - ); - - if (entry == null) { - return Collections.singletonList( - new ReportTimelineMissingSegmentQueryRunner(input)); - } - - final PartitionChunk chunk = entry.getChunk(input.getPartitionNumber()); - if (chunk == null) { - return Collections.singletonList(new ReportTimelineMissingSegmentQueryRunner(input)); - } - - final ReferenceCountingSegment adapter = chunk.getObject(); - return Collections.singletonList( - buildAndDecorateQueryRunner(factory, toolChest, adapter, input, cpuTimeAccumulator) - ); + descriptor -> { + final PartitionHolder entry = timeline.findEntry( + descriptor.getInterval(), + descriptor.getVersion() + ); + + if (entry == null) { + return Collections.singletonList(new ReportTimelineMissingSegmentQueryRunner<>(descriptor)); } + + final PartitionChunk chunk = entry.getChunk(descriptor.getPartitionNumber()); + if (chunk == null) { + return Collections.singletonList(new ReportTimelineMissingSegmentQueryRunner<>(descriptor)); + } + + final ReferenceCountingSegment segment = chunk.getObject(); + return Collections.singletonList( + buildAndDecorateQueryRunner( + factory, + toolChest, + segment, + descriptor, + cpuTimeAccumulator + ) + ); } ); diff --git a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java index b630b8ec6c31..dd3961e04f42 100644 --- a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java +++ b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java @@ -39,6 +39,7 @@ import org.apache.druid.query.QueryToolChestWarehouse; import org.apache.druid.query.QueryWatcher; import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.segment.TestHelper; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; @@ -114,7 +115,9 @@ public void testSingleServerAddedRemovedSegment() throws Exception Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch)); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch)); - TimelineLookup timeline = brokerServerView.getTimeline(new TableDataSource("test_broker_server_view")); + TimelineLookup timeline = brokerServerView.getTimeline( + DataSourceAnalysis.forDataSource(new TableDataSource("test_broker_server_view")) + ).get(); List serverLookupRes = (List) timeline.lookup( Intervals.of( "2014-10-20T00:00:00Z/P1D" @@ -203,7 +206,9 @@ public DataSegment apply(Pair input) Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch)); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch)); - TimelineLookup timeline = brokerServerView.getTimeline(new TableDataSource("test_broker_server_view")); + TimelineLookup timeline = brokerServerView.getTimeline( + DataSourceAnalysis.forDataSource(new TableDataSource("test_broker_server_view")) + ).get(); assertValues( Arrays.asList( createExpected("2011-04-01/2011-04-02", "v3", druidServers.get(4), segments.get(4)), @@ -224,7 +229,9 @@ public DataSegment apply(Pair input) // renew segmentRemovedLatch since we still have 4 segments to unannounce segmentRemovedLatch = new CountDownLatch(4); - timeline = brokerServerView.getTimeline(new TableDataSource("test_broker_server_view")); + timeline = brokerServerView.getTimeline( + DataSourceAnalysis.forDataSource(new TableDataSource("test_broker_server_view")) + ).get(); assertValues( Arrays.asList( createExpected("2011-04-01/2011-04-02", "v3", druidServers.get(4), segments.get(4)), diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java index a75ad64e080c..3efe7bc5d759 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java @@ -38,7 +38,6 @@ import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.io.Closer; -import org.apache.druid.query.DataSource; import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.Druids; import org.apache.druid.query.Query; @@ -47,8 +46,10 @@ import org.apache.druid.query.QueryToolChestWarehouse; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.context.ResponseContext; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.TimelineLookup; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.NoneShardSpec; import org.apache.druid.timeline.partition.SingleElementPartitionChunk; @@ -64,11 +65,13 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ForkJoinPool; /** + * */ public class CachingClusteredClientFunctionalityTest { @@ -245,9 +248,9 @@ public void registerSegmentCallback(Executor exec, SegmentCallback callback) } @Override - public VersionedIntervalTimeline getTimeline(DataSource dataSource) + public Optional> getTimeline(DataSourceAnalysis analysis) { - return timeline; + return Optional.of(timeline); } @Nullable diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java index bd91104aac41..9c0588837ffd 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java @@ -68,7 +68,6 @@ import org.apache.druid.java.util.common.guava.nary.TrinaryFn; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.query.BySegmentResultValueClass; -import org.apache.druid.query.DataSource; import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.Druids; import org.apache.druid.query.FinalizeResultsQueryRunner; @@ -98,6 +97,7 @@ import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.query.groupby.strategy.GroupByStrategySelector; import org.apache.druid.query.ordering.StringComparators; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.search.SearchHit; import org.apache.druid.query.search.SearchQuery; import org.apache.druid.query.search.SearchQueryConfig; @@ -148,6 +148,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Random; import java.util.TreeMap; import java.util.concurrent.Callable; @@ -2391,9 +2392,9 @@ public void registerSegmentCallback(Executor exec, SegmentCallback callback) } @Override - public VersionedIntervalTimeline getTimeline(DataSource dataSource) + public Optional> getTimeline(DataSourceAnalysis analysis) { - return timeline; + return Optional.of(timeline); } @Override diff --git a/server/src/test/java/org/apache/druid/server/ClientInfoResourceTest.java b/server/src/test/java/org/apache/druid/server/ClientInfoResourceTest.java index 60b633d22624..f6c880c0e205 100644 --- a/server/src/test/java/org/apache/druid/server/ClientInfoResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/ClientInfoResourceTest.java @@ -30,8 +30,8 @@ import org.apache.druid.client.selector.RandomServerSelectorStrategy; import org.apache.druid.client.selector.ServerSelector; import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.query.TableDataSource; import org.apache.druid.query.metadata.SegmentMetadataQueryConfig; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.security.AuthConfig; import org.apache.druid.timeline.DataSegment; @@ -48,6 +48,7 @@ import java.util.List; import java.util.Map; +import java.util.Optional; public class ClientInfoResourceTest { @@ -128,7 +129,8 @@ public void setup() EasyMock.expect(serverInventoryView.getInventory()).andReturn(ImmutableList.of(server)).anyTimes(); timelineServerView = EasyMock.createMock(TimelineServerView.class); - EasyMock.expect(timelineServerView.getTimeline(EasyMock.anyObject(TableDataSource.class))).andReturn(timeline); + EasyMock.expect(timelineServerView.getTimeline(EasyMock.anyObject(DataSourceAnalysis.class))) + .andReturn((Optional) Optional.of(timeline)); EasyMock.replay(serverInventoryView, timelineServerView); diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java index b6c9c1671fc3..04f796e9918d 100644 --- a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java +++ b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java @@ -24,6 +24,8 @@ import com.google.common.collect.Ordering; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.MapUtils; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.segment.AbstractSegment; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.ReferenceCountingSegment; @@ -49,6 +51,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -382,7 +385,10 @@ public void testRemoveEmptyTimeline() throws SegmentLoadingException @Test public void testGetNonExistingTimeline() { - Assert.assertNull(segmentManager.getTimeline("nonExisting")); + Assert.assertEquals( + Optional.empty(), + segmentManager.getTimeline(DataSourceAnalysis.forDataSource(new TableDataSource("nonExisting"))) + ); } @Test @@ -448,7 +454,10 @@ private void assertResult(List expectedExistingSegments) throws Seg dataSources.forEach( (sourceName, dataSourceState) -> { Assert.assertEquals(expectedDataSourceCounts.get(sourceName).longValue(), dataSourceState.getNumSegments()); - Assert.assertEquals(expectedDataSourceSizes.get(sourceName).longValue(), dataSourceState.getTotalSegmentSize()); + Assert.assertEquals( + expectedDataSourceSizes.get(sourceName).longValue(), + dataSourceState.getTotalSegmentSize() + ); Assert.assertEquals( expectedDataSources.get(sourceName).getAllTimelineEntries(), dataSourceState.getTimeline().getAllTimelineEntries() diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/QueryMaker.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/QueryMaker.java index 1910bc27b654..5aacb8773f37 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/QueryMaker.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/QueryMaker.java @@ -37,8 +37,9 @@ import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.math.expr.Evals; import org.apache.druid.query.Query; -import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.QueryToolChest; +import org.apache.druid.query.planning.DataSourceAnalysis; +import org.apache.druid.query.spec.QuerySegmentSpec; import org.apache.druid.query.timeseries.TimeseriesQuery; import org.apache.druid.segment.DimensionHandlerUtils; import org.apache.druid.segment.column.ColumnHolder; @@ -48,6 +49,7 @@ import org.apache.druid.sql.calcite.planner.Calcites; import org.apache.druid.sql.calcite.planner.PlannerContext; import org.joda.time.DateTime; +import org.joda.time.Interval; import java.io.IOException; import java.util.Collection; @@ -87,8 +89,7 @@ public Sequence runQuery(final DruidQuery druidQuery) final Query query = druidQuery.getQuery(); if (plannerContext.getPlannerConfig().isRequireTimeCondition()) { - final Query innerMostQuery = findInnerMostQuery(query); - if (innerMostQuery.getIntervals().equals(Intervals.ONLY_ETERNITY)) { + if (Intervals.ONLY_ETERNITY.equals(findBaseDataSourceIntervals(query))) { throw new CannotBuildQueryException( "requireTimeCondition is enabled, all queries must include a filter condition on the __time column" ); @@ -121,13 +122,12 @@ public Sequence runQuery(final DruidQuery druidQuery) ); } - private Query findInnerMostQuery(Query outerQuery) + private List findBaseDataSourceIntervals(Query query) { - Query query = outerQuery; - while (query.getDataSource() instanceof QueryDataSource) { - query = ((QueryDataSource) query.getDataSource()).getQuery(); - } - return query; + return DataSourceAnalysis.forDataSource(query.getDataSource()) + .getBaseQuerySegmentSpec() + .map(QuerySegmentSpec::getIntervals) + .orElse(query.getIntervals()); } private Sequence execute(Query query, final List newFields, final List newTypes) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java index 0f36273035a7..a8e498beb1f0 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java @@ -26,8 +26,8 @@ import org.apache.druid.client.ImmutableDruidServer; import org.apache.druid.client.TimelineServerView; import org.apache.druid.client.selector.ServerSelector; -import org.apache.druid.query.DataSource; import org.apache.druid.query.QueryRunner; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.timeline.DataSegment; @@ -37,6 +37,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.concurrent.Executor; /** @@ -77,7 +78,7 @@ public TestServerInventoryView(List segments, List rea } @Override - public TimelineLookup getTimeline(DataSource dataSource) + public Optional> getTimeline(DataSourceAnalysis analysis) { throw new UnsupportedOperationException(); }