Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Foundations for an operator-based approach for Druid queries #12641

Closed
wants to merge 11 commits into from
53 changes: 44 additions & 9 deletions processing/src/main/java/org/apache/druid/query/QueryPlus.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.queryng.fragment.FragmentBuilder;

import javax.annotation.Nullable;

Expand All @@ -40,18 +41,24 @@ public final class QueryPlus<T>
public static <T> QueryPlus<T> wrap(Query<T> query)
{
Preconditions.checkNotNull(query);
return new QueryPlus<>(query, null, null);
return new QueryPlus<>(query, null, null, null);
}

private final Query<T> query;
private final QueryMetrics<?> queryMetrics;
private final String identity;
private final FragmentBuilder fragmentBuilder;

private QueryPlus(Query<T> query, QueryMetrics<?> queryMetrics, String identity)
private QueryPlus(
Query<T> query,
QueryMetrics<?> queryMetrics,
String identity,
FragmentBuilder fragmentBuilder)
{
this.query = query;
this.queryMetrics = queryMetrics;
this.identity = identity;
this.fragmentBuilder = fragmentBuilder;
}

public Query<T> getQuery()
Expand All @@ -71,7 +78,7 @@ public QueryMetrics<?> getQueryMetrics()
*/
public QueryPlus<T> withIdentity(String identity)
{
return new QueryPlus<>(query, queryMetrics, identity);
return new QueryPlus<>(query, queryMetrics, identity, fragmentBuilder);
}

/**
Expand All @@ -89,16 +96,24 @@ public QueryPlus<T> withQueryMetrics(QueryToolChest<T, ? extends Query<T>> query
if (queryMetrics != null) {
return this;
} else {
final QueryMetrics metrics = ((QueryToolChest) queryToolChest).makeMetrics(query);
final QueryMetrics<?> metrics = ((QueryToolChest) queryToolChest).makeMetrics(query);

if (identity != null) {
metrics.identity(identity);
}

return new QueryPlus<>(query, metrics, identity);
return new QueryPlus<>(query, metrics, identity, fragmentBuilder);
}
}

public QueryPlus<T> withoutMetrics()
{
if (queryMetrics == null) {
return this;
}
return new QueryPlus<>(query, null, identity, fragmentBuilder);
}

/**
* Returns a QueryPlus object without the components which are unsafe for concurrent use from multiple threads,
* therefore couldn't be passed down in concurrent or async {@link QueryRunner}s.
Expand All @@ -120,7 +135,7 @@ private QueryPlus<T> withoutQueryMetrics()
if (queryMetrics == null) {
return this;
} else {
return new QueryPlus<>(query, null, identity);
return new QueryPlus<>(query, null, identity, fragmentBuilder);
}
}

Expand All @@ -132,7 +147,8 @@ public QueryPlus<T> withMaxQueuedBytes(long maxQueuedBytes)
return new QueryPlus<>(
query.withOverriddenContext(ImmutableMap.of(QueryContexts.MAX_QUEUED_BYTES_KEY, maxQueuedBytes)),
queryMetrics,
identity
identity,
fragmentBuilder
);
}

Expand All @@ -141,7 +157,7 @@ public QueryPlus<T> withMaxQueuedBytes(long maxQueuedBytes)
*/
public <U> QueryPlus<U> withQuery(Query<U> replacementQuery)
{
return new QueryPlus<>(replacementQuery, queryMetrics, identity);
return new QueryPlus<>(replacementQuery, queryMetrics, identity, fragmentBuilder);
}

public Sequence<T> run(QuerySegmentWalker walker, ResponseContext context)
Expand All @@ -151,6 +167,25 @@ public Sequence<T> run(QuerySegmentWalker walker, ResponseContext context)

public QueryPlus<T> optimizeForSegment(PerSegmentQueryOptimizationContext optimizationContext)
{
return new QueryPlus<>(query.optimizeForSegment(optimizationContext), queryMetrics, identity);
return new QueryPlus<>(
query.optimizeForSegment(optimizationContext),
queryMetrics,
identity,
fragmentBuilder);
}

/**
* Returns the same QueryPlus object with the fragment builder added. The fragment
* builder enables this query to use the "Next Gen" query engine. The builder
* may be null, which indicates to use the "classic" rather than "NG" engine.
*/
public QueryPlus<T> withFragmentBuilder(FragmentBuilder fragmentBuilder)
{
return new QueryPlus<>(query, queryMetrics, identity, fragmentBuilder);
}

public FragmentBuilder fragmentBuilder()
{
return fragmentBuilder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -330,4 +330,10 @@ public Sequence<Object[]> resultsAsArrays(QueryType query, Sequence<ResultType>
{
throw new UOE("Query type '%s' does not support returning results as arrays", query.getType());
}

@SuppressWarnings({"unchecked", "unused"})
public Sequence<Object[]> resultsAsArrays(QueryPlus<ResultType> query, Sequence<ResultType> resultSequence)
{
return resultsAsArrays((QueryType) query.getQuery(), resultSequence);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ public class ScanQueryLimitRowIterator implements CloseableIterator<ScanResultVa
this.limit = query.getScanRowsLimit();
Query<ScanResultValue> historicalQuery =
queryPlus.getQuery().withOverriddenContext(ImmutableMap.of(ScanQuery.CTX_KEY_OUTERMOST, false));
Sequence<ScanResultValue> baseSequence = baseRunner.run(QueryPlus.wrap(historicalQuery), responseContext);
// No metrics past this point: metrics are not thread-safe.
QueryPlus<ScanResultValue> wrapped = queryPlus.withQuery(historicalQuery).withoutMetrics();
Sequence<ScanResultValue> baseSequence = baseRunner.run(wrapped, responseContext);
this.yielder = baseSequence.toYielder(
null,
new YieldingAccumulator<ScanResultValue, ScanResultValue>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,12 @@
import org.apache.druid.query.GenericQueryMetricsFactory;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.aggregation.MetricManipulationFn;
import org.apache.druid.queryng.config.QueryNGConfig;
import org.apache.druid.queryng.planner.ScanPlanner;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
Expand Down Expand Up @@ -66,6 +69,9 @@ public ScanQueryQueryToolChest(
public QueryRunner<ScanResultValue> mergeResults(final QueryRunner<ScanResultValue> runner)
{
return (queryPlus, responseContext) -> {
if (QueryNGConfig.enabledFor(queryPlus)) {
return ScanPlanner.runLimitAndOffset(queryPlus, runner, responseContext, scanQueryConfig);
}
final ScanQuery originalQuery = ((ScanQuery) (queryPlus.getQuery()));
ScanQuery.verifyOrderByForNativeExecution(originalQuery);

Expand Down Expand Up @@ -213,7 +219,7 @@ public Sequence<Object[]> resultsAsArrays(final ScanQuery query, final Sequence<
// Uh oh... mismatch in expected and actual field count. I don't think this should happen, so let's
// throw an exception. If this really does happen, and there's a good reason for it, then we should remap
// the result row here.
throw new ISE("Mismatch in expected[%d] vs actual[%s] field count", fields.size(), row.size());
throw new ISE("Mismatch in expected [%d] vs actual [%s] field count", fields.size(), row.size());
}
};
break;
Expand All @@ -224,10 +230,24 @@ public Sequence<Object[]> resultsAsArrays(final ScanQuery query, final Sequence<
return resultSequence.flatMap(
result -> {
// Generics? Where we're going, we don't need generics.
final List rows = (List) result.getEvents();
final Iterable arrays = Iterables.transform(rows, (Function) mapper);
@SuppressWarnings("unchecked")
final List<Object[]> rows = (List<Object[]>) result.getEvents();
@SuppressWarnings("unchecked")
final Iterable<Object[]> arrays = Iterables.transform(rows, (Function) mapper);
return Sequences.simple(arrays);
}
);
}

@Override
public Sequence<Object[]> resultsAsArrays(QueryPlus<ScanResultValue> queryPlus, Sequence<ScanResultValue> resultSequence)
{
ScanQuery query = (ScanQuery) queryPlus.getQuery();
if (QueryNGConfig.enabledFor(queryPlus)) {
final List<String> fields = resultArraySignature(query).getColumnNames();
return ScanPlanner.resultsAsArrays(queryPlus, fields, resultSequence);
} else {
return resultsAsArrays(query, resultSequence);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.queryng.config.QueryNGConfig;
import org.apache.druid.queryng.planner.ScanPlanner;
import org.apache.druid.segment.Segment;
import org.joda.time.Interval;

Expand Down Expand Up @@ -87,8 +89,18 @@ public QueryRunner<ScanResultValue> mergeRunners(
final Iterable<QueryRunner<ScanResultValue>> queryRunners
)
{
// in single thread and in Jetty thread instead of processing thread
// In single thread and in Jetty thread instead of processing thread
return (queryPlus, responseContext) -> {
if (QueryNGConfig.enabledFor(queryPlus)) {
Sequence<ScanResultValue> results = ScanPlanner.runMerge(
queryPlus,
queryRunners,
responseContext);
if (results != null) {
return results;
}
}

ScanQuery query = (ScanQuery) queryPlus.getQuery();
ScanQuery.verifyOrderByForNativeExecution(query);

Expand Down Expand Up @@ -147,7 +159,7 @@ public QueryRunner<ScanResultValue> mergeRunners(
((SinkQueryRunners<ScanResultValue>) queryRunners).runnerIntervalMappingIterator()
.forEachRemaining(intervalsAndRunnersOrdered::add);
} else {
throw new ISE("Number of segment descriptors does not equal number of "
throw new ISE("Number of segment descriptors does not equal the number of "
+ "query runners...something went wrong!");
}

Expand Down Expand Up @@ -295,7 +307,7 @@ List<Interval> getIntervalsFromSpecificQuerySpec(QuerySegmentSpec spec)
} else {
throw new UOE(
"Time-ordering on scan queries is only supported for queries with segment specs "
+ "of type MultipleSpecificSegmentSpec or SpecificSegmentSpec...a [%s] was received instead.",
+ "of type MultipleSpecificSegmentSpec or SpecificSegmentSpec. A [%s] was received instead.",
spec.getClass().getSimpleName()
);
}
Expand All @@ -310,8 +322,8 @@ Sequence<ScanResultValue> nWayMergeAndLimit(
)
{
// Starting from the innermost Sequences.map:
// (1) Deaggregate each ScanResultValue returned by the query runners
// (2) Combine the deaggregated ScanResultValues into a single sequence
// (1) Disaggregate each ScanResultValue returned by the query runners
// (2) Combine the disaggregated ScanResultValues into a single sequence
// (3) Create a sequence of results from each runner in the group and flatmerge based on timestamp
// (4) Create a sequence of results from each runner group
// (5) Join all the results into a single sequence
Expand Down Expand Up @@ -361,11 +373,14 @@ public ScanQueryRunner(ScanQueryEngine engine, Segment segment)
@Override
public Sequence<ScanResultValue> run(QueryPlus<ScanResultValue> queryPlus, ResponseContext responseContext)
{
if (QueryNGConfig.enabledFor(queryPlus)) {
return ScanPlanner.runScan(queryPlus, segment, responseContext);
}

Query<ScanResultValue> query = queryPlus.getQuery();
if (!(query instanceof ScanQuery)) {
throw new ISE("Got a [%s] which isn't a %s", query.getClass(), ScanQuery.class);
}

ScanQuery.verifyOrderByForNativeExecution((ScanQuery) query);

// it happens in unit tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,17 @@ public Object getEvents()
return events;
}

@SuppressWarnings("unchecked")
public <T> List<T> getRows()
{
return (List<T>) getEvents();
}

public long getFirstEventTimestamp(ScanQuery.ResultFormat resultFormat)
{
if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)) {
Object timestampObj = ((Map<String, Object>) ((List<Object>) this.getEvents()).get(0)).get(ColumnHolder.TIME_COLUMN_NAME);
final List<Map<String, Object>> rows = getRows();
Object timestampObj = rows.get(0).get(ColumnHolder.TIME_COLUMN_NAME);
if (timestampObj == null) {
throw new ISE("Unable to compare timestamp for rows without a time column");
}
Expand All @@ -90,22 +97,27 @@ public long getFirstEventTimestamp(ScanQuery.ResultFormat resultFormat)
if (timeColumnIndex == -1) {
throw new ISE("Unable to compare timestamp for rows without a time column");
}
List<Object> firstEvent = (List<Object>) ((List<Object>) this.getEvents()).get(0);
final List<List<Object>> rows = getRows();
final List<Object> firstEvent = rows.get(0);
return DimensionHandlerUtils.convertObjectToLong(firstEvent.get(timeColumnIndex));
}
throw new UOE("Unable to get first event timestamp using result format of [%s]", resultFormat.toString());
}

public List<ScanResultValue> toSingleEventScanResultValues()
{
List<ScanResultValue> singleEventScanResultValues = new ArrayList<>();
List<Object> events = (List<Object>) this.getEvents();
for (Object event : events) {
final List<ScanResultValue> singleEventScanResultValues = new ArrayList<>();
for (Object event : getRows()) {
singleEventScanResultValues.add(new ScanResultValue(segmentId, columns, Collections.singletonList(event)));
}
return singleEventScanResultValues;
}

public int rowCount()
{
return getRows().size();
}

@Override
public boolean equals(Object o)
{
Expand Down
Loading