Skip to content

Commit

Permalink
Starter set of scan query operators
Browse files Browse the repository at this point in the history
  • Loading branch information
paul-rogers committed Jun 19, 2022
1 parent a45f687 commit 2eb2490
Show file tree
Hide file tree
Showing 26 changed files with 225 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -330,4 +330,10 @@ public Sequence<Object[]> resultsAsArrays(QueryType query, Sequence<ResultType>
{
throw new UOE("Query type '%s' does not support returning results as arrays", query.getType());
}

@SuppressWarnings("unchecked")
public Sequence<Object[]> resultsAsArrays(QueryPlus<ResultType> query, Sequence<ResultType> resultSequence)
{
return resultsAsArrays((QueryType) query.getQuery(), resultSequence);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class ScanQueryLimitRowIterator implements CloseableIterator<ScanResultVa
this.limit = query.getScanRowsLimit();
Query<ScanResultValue> historicalQuery =
queryPlus.getQuery().withOverriddenContext(ImmutableMap.of(ScanQuery.CTX_KEY_OUTERMOST, false));
Sequence<ScanResultValue> baseSequence = baseRunner.run(QueryPlus.wrap(historicalQuery), responseContext);
Sequence<ScanResultValue> baseSequence = baseRunner.run(queryPlus.withQuery(historicalQuery), responseContext);
this.yielder = baseSequence.toYielder(
null,
new YieldingAccumulator<ScanResultValue, ScanResultValue>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,12 @@
import org.apache.druid.query.GenericQueryMetricsFactory;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.aggregation.MetricManipulationFn;
import org.apache.druid.queryng.config.QueryNGConfig;
import org.apache.druid.queryng.planner.ScanPlanner;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
Expand Down Expand Up @@ -66,6 +69,9 @@ public ScanQueryQueryToolChest(
public QueryRunner<ScanResultValue> mergeResults(final QueryRunner<ScanResultValue> runner)
{
return (queryPlus, responseContext) -> {
if (QueryNGConfig.enabledFor(queryPlus)) {
return ScanPlanner.runLimitAndOffset(queryPlus, runner, responseContext, scanQueryConfig);
}
final ScanQuery originalQuery = ((ScanQuery) (queryPlus.getQuery()));
ScanQuery.verifyOrderByForNativeExecution(originalQuery);

Expand Down Expand Up @@ -213,7 +219,7 @@ public Sequence<Object[]> resultsAsArrays(final ScanQuery query, final Sequence<
// Uh oh... mismatch in expected and actual field count. I don't think this should happen, so let's
// throw an exception. If this really does happen, and there's a good reason for it, then we should remap
// the result row here.
throw new ISE("Mismatch in expected[%d] vs actual[%s] field count", fields.size(), row.size());
throw new ISE("Mismatch in expected [%d] vs actual [%s] field count", fields.size(), row.size());
}
};
break;
Expand All @@ -230,4 +236,17 @@ public Sequence<Object[]> resultsAsArrays(final ScanQuery query, final Sequence<
}
);
}

@Override
@SuppressWarnings("unchecked")
public Sequence<Object[]> resultsAsArrays(QueryPlus<ScanResultValue> queryPlus, Sequence<ScanResultValue> resultSequence)
{
ScanQuery query = (ScanQuery) queryPlus.getQuery();
if (QueryNGConfig.enabledFor(queryPlus)) {
final List<String> fields = resultArraySignature(query).getColumnNames();
return ScanPlanner.resultsAsArrays(queryPlus, fields, resultSequence);
} else {
return resultsAsArrays(query, resultSequence);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.queryng.config.QueryNGConfig;
import org.apache.druid.queryng.planner.ScanPlanner;
import org.apache.druid.segment.Segment;
import org.joda.time.Interval;

Expand Down Expand Up @@ -87,8 +89,18 @@ public QueryRunner<ScanResultValue> mergeRunners(
final Iterable<QueryRunner<ScanResultValue>> queryRunners
)
{
// in single thread and in Jetty thread instead of processing thread
// In single thread and in Jetty thread instead of processing thread
return (queryPlus, responseContext) -> {
if (QueryNGConfig.enabledFor(queryPlus)) {
Sequence<ScanResultValue> results = ScanPlanner.runMerge(
queryPlus,
queryRunners,
responseContext);
if (results != null) {
return results;
}
}

ScanQuery query = (ScanQuery) queryPlus.getQuery();
ScanQuery.verifyOrderByForNativeExecution(query);

Expand Down Expand Up @@ -147,7 +159,7 @@ public QueryRunner<ScanResultValue> mergeRunners(
((SinkQueryRunners<ScanResultValue>) queryRunners).runnerIntervalMappingIterator()
.forEachRemaining(intervalsAndRunnersOrdered::add);
} else {
throw new ISE("Number of segment descriptors does not equal number of "
throw new ISE("Number of segment descriptors does not equal the number of "
+ "query runners...something went wrong!");
}

Expand Down Expand Up @@ -295,7 +307,7 @@ List<Interval> getIntervalsFromSpecificQuerySpec(QuerySegmentSpec spec)
} else {
throw new UOE(
"Time-ordering on scan queries is only supported for queries with segment specs "
+ "of type MultipleSpecificSegmentSpec or SpecificSegmentSpec...a [%s] was received instead.",
+ "of type MultipleSpecificSegmentSpec or SpecificSegmentSpec. A [%s] was received instead.",
spec.getClass().getSimpleName()
);
}
Expand All @@ -310,8 +322,8 @@ Sequence<ScanResultValue> nWayMergeAndLimit(
)
{
// Starting from the innermost Sequences.map:
// (1) Deaggregate each ScanResultValue returned by the query runners
// (2) Combine the deaggregated ScanResultValues into a single sequence
// (1) Disaggregate each ScanResultValue returned by the query runners
// (2) Combine the disaggregated ScanResultValues into a single sequence
// (3) Create a sequence of results from each runner in the group and flatmerge based on timestamp
// (4) Create a sequence of results from each runner group
// (5) Join all the results into a single sequence
Expand Down Expand Up @@ -361,11 +373,14 @@ public ScanQueryRunner(ScanQueryEngine engine, Segment segment)
@Override
public Sequence<ScanResultValue> run(QueryPlus<ScanResultValue> queryPlus, ResponseContext responseContext)
{
if (QueryNGConfig.enabledFor(queryPlus)) {
return ScanPlanner.runScan(queryPlus, segment, responseContext);
}

Query<ScanResultValue> query = queryPlus.getQuery();
if (!(query instanceof ScanQuery)) {
throw new ISE("Got a [%s] which isn't a %s", query.getClass(), ScanQuery.class);
}

ScanQuery.verifyOrderByForNativeExecution((ScanQuery) query);

// it happens in unit tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,25 @@ public class QueryNGConfig
@SuppressWarnings("unused") // To be used later
public static final String CONFIG_ROOT = "druid.queryng";

public static final String CONTEXT_VAR = "queryng";

/**
* Whether the engine is enabled. It is disabled by default.
*/
@JsonProperty("enabled")
private boolean enabled;

public static final String CONTEXT_VAR = "queryng";
@JsonProperty("requireContext")
private boolean requireContext = true;

/**
* Create an instance for testing.
*/
@SuppressWarnings("unused") // To be used later
public static QueryNGConfig create(boolean enabled)
public static QueryNGConfig create(boolean enabled, boolean requireContext)
{
QueryNGConfig config = new QueryNGConfig();
config.enabled = enabled;
config.requireContext = requireContext;
return config;
}

Expand All @@ -57,26 +60,29 @@ public boolean enabled()
}

/**
* Determine if Query NG should be enabled for the given query;
* that is, if the query should have a fragment context attached.
* Determine if Query NG should be enabled for the given query. Only scan
* queries are currently supported. For safety, the default config also
* requires that a context variable be set to enable the operatore-based
* engine. However, the configuration can skip the context check. A present,
* the skip-context option is primarily for testing.
* that is, if the query should have a fragment context attached.
* At present, Query NG is enabled if the query is a scan query and
* the query has the "queryng" context variable set. The caller
* should already have checked if the Query NG engine is enabled
* globally. If Query NG is enabled for a query, then the caller
* will attach a fragment context to the query's QueryPlus.
*/
public static boolean isEnabled(Query<?> query)
public boolean isEnabled(Query<?> query)
{
// Query has to be of the currently-supported type
if (!(query instanceof ScanQuery)) {
return false;
}
return query.getContextBoolean(CONTEXT_VAR, false);
return enabled
&& (query instanceof ScanQuery)
&& (!requireContext || query.getContextBoolean(CONTEXT_VAR, false));
}

/**
* Determine if the Query NG (operator-based) engine is enabled for the
* given query (given as a QueryPlus). Query NG is enabled if the QueryPlus
* Determine if the Query NG (operator-based) engine is enabled for the given
* query (given as a QueryPlus). Query NG is enabled if the QueryPlus
* includes the fragment context needed by the Query NG engine.
*/
public static boolean enabledFor(final QueryPlus<?> queryPlus)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,8 @@ public FragmentBuilder create(
final Query<?> query,
final ResponseContext responseContext)
{
// Engine has to be enabled
if (!config.enabled()) {
return null;
}
// Client must explicitly ask for the engine
if (!QueryNGConfig.isEnabled(query)) {
// Config imposes a number of obstacles.
if (!config.isEnabled(query)) {
return null;
}
// Only then do we create a fragment builder which, implicitly,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.queryng.fragment;

import org.apache.druid.query.Query;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.queryng.config.QueryNGConfig;

/**
* Test version of the fragment factory which enables Query NG only if
* the {@code druid.queryng.enable} system property is set, and then,
* only for scan queries.
*/
public class TestFragmentBuilderFactory implements FragmentBuilderFactory
{
private static final String ENABLED_KEY = QueryNGConfig.CONFIG_ROOT + ".enabled";
private static final boolean ENABLED = Boolean.parseBoolean(System.getProperty(ENABLED_KEY));

@Override
public FragmentBuilder create(Query<?> query, ResponseContext responseContext)
{
//if (!ENABLED) {
// return null;
//}
if (!(query instanceof ScanQuery)) {
return null;
}
return new FragmentBuilderImpl(query.getId(), 0, responseContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.queryng.config.QueryNGConfig;
import org.apache.druid.queryng.fragment.FragmentBuilderFactory;
import org.apache.druid.queryng.fragment.FragmentBuilderFactoryImpl;
import org.apache.druid.queryng.fragment.NullFragmentBuilderFactory;

/**
* Configure the "shim" version of the NG query engine which entails
Expand All @@ -42,7 +42,8 @@ public void configure(Binder binder)
JsonConfigProvider.bind(binder, QueryNGConfig.CONFIG_ROOT, QueryNGConfig.class);
binder
.bind(FragmentBuilderFactory.class)
.to(FragmentBuilderFactoryImpl.class)
// Query NG disabled in production nodes for now.
.to(NullFragmentBuilderFactory.class)
.in(LazySingleton.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public Object[] next() throws EofException
// happen, and there's a good reason for it, then we should remap
// the result row here.
throw new ISE(
"Mismatch in expected [%d] vs actual [%s] field count",
"Mismatch in expected [%d] vs. actual [%s] field count",
fields.size(),
row.size());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@

package org.apache.druid.queryng.operators;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.SequenceTestHelper;
import org.apache.druid.query.Druids;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.queryng.config.QueryNGConfig;
import org.apache.druid.queryng.fragment.FragmentBuilder;
import org.apache.druid.queryng.fragment.FragmentBuilderFactory;
Expand Down Expand Up @@ -294,6 +297,43 @@ public void testRunEmptyHandleAsSequence()
SequenceTestHelper.testAccumulation("empty", seq, Collections.emptyList());
}

@Test
public void testConfig()
{
Query<?> scanQuery = new Druids.ScanQueryBuilder()
.dataSource("foo")
.eternityInterval()
.build();
Query<?> scanQueryWithContext = scanQuery.withOverriddenContext(
ImmutableMap.of(QueryNGConfig.CONTEXT_VAR, true));
Query<?> otherQuery = Druids.newTimeseriesQueryBuilder()
.dataSource("foo")
.intervals(new MultipleIntervalSegmentSpec(
ImmutableList.of(Intervals.ETERNITY)))
.build();

// Completely diabled.
QueryNGConfig config = QueryNGConfig.create(false, false);
assertFalse(config.enabled());
assertFalse(config.isEnabled(scanQuery));
assertFalse(config.isEnabled(scanQueryWithContext));
assertFalse(config.isEnabled(otherQuery));

// Enabled, but only for scan.
config = QueryNGConfig.create(true, false);
assertTrue(config.enabled());
assertTrue(config.isEnabled(scanQuery));
assertTrue(config.isEnabled(scanQueryWithContext));
assertFalse(config.isEnabled(otherQuery));

// Enabled, but only for scan, and only if requested in context.
config = QueryNGConfig.create(true, true);
assertTrue(config.enabled());
assertFalse(config.isEnabled(scanQuery));
assertTrue(config.isEnabled(scanQueryWithContext));
assertFalse(config.isEnabled(otherQuery));
}

@Test
public void testFactory()
{
Expand All @@ -303,13 +343,13 @@ public void testFactory()
.build();

// Operators blocked by query: no gating context variable
QueryNGConfig enableConfig = QueryNGConfig.create(true);
QueryNGConfig enableConfig = QueryNGConfig.create(true, true);
assertTrue(enableConfig.enabled());
FragmentBuilderFactory enableFactory = new FragmentBuilderFactoryImpl(enableConfig);
assertNull(enableFactory.create(query, ResponseContext.createEmpty()));
FragmentBuilderFactory nullFactory = new NullFragmentBuilderFactory();

QueryNGConfig disableConfig = QueryNGConfig.create(false);
QueryNGConfig disableConfig = QueryNGConfig.create(false, false);
assertFalse(disableConfig.enabled());
FragmentBuilderFactory disableFactory = new FragmentBuilderFactoryImpl(disableConfig);
assertNull(disableFactory.create(query, ResponseContext.createEmpty()));
Expand Down
Loading

0 comments on commit 2eb2490

Please sign in to comment.