-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement force push down for nested group by query #5471
Conversation
Great! I'll take a look soon. |
Hi @samarthjain, I'll do my review today or tomorrow. Before that, I'd like to ask one question. Looks that a query context value decides pushing nested query execution or not. Does this mean all nested subqueries are pushed to historicals if the query is deeply nested? |
@jihoonson - Currently, when the user sets the push nested query execution flag on the outermost query, then the broker executes the entire nested query on the historical nodes. The broker then merges the results returned by the historical nodes as usual except that in this case, the rows returned by historical nodes have the structure of the outermost part of the query. |
Thanks. It sounds good for the first step. Will leave some comments soon! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@samarthjain nice work! Left some comments. The current approach looks good to me.
} else { | ||
return groupByStrategy.mergeResults(runner, query, context); | ||
} | ||
} | ||
|
||
public static boolean shouldPushDownQuery(GroupByQuery q) | ||
{ | ||
return QueryContexts.parseBoolean(q, GroupByQueryConfig.CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be simplified to return q.getContextBoolean(GroupByQueryConfig.CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY, false);
and then https://github.com/druid-io/druid/pull/5471/files#diff-76d2b703319f06a8103ebce8a58ef7fcR221 is not necessary.
// Unset the push down nested query flag so that the historical doesn't erroneously end up pushing down the query itself | ||
pushDownQueryBuilder.overrideContext(ImmutableMap.of(GroupByQueryConfig.CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY, false)); | ||
GroupByQuery queryToPushDown = pushDownQueryBuilder.build(); | ||
innerQueryBuilder.setQueryToPushDown(queryToPushDown); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you elaborate more on this? setQueryToPushDown()
means setting the outer query for the inner query?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was done to make sure that we can pass on the entire nested query over to the historical nodes. The role of the inner most query is only limited to figuring out what segments to query. But what gets executed on the historical nodes is this pushed down query which is nothing but the complete query. I think there is some improvement possible here. Right now, I am intercepting on DirectDruidClient to see whether the pushedDownQuery is present. If so, the DirectDruidClient gets hold of it and sends it over to the historical node.
Relevant snippet in DirectDruidClient: https://github.com/druid-io/druid/pull/5471/files#diff-604c8c11338380d32a71a7621f031d2aR172
@@ -143,7 +156,7 @@ public boolean apply(Row input) | |||
|
|||
Pair<Grouper<RowBasedKey>, Accumulator<AggregateResult, Row>> pair = RowBasedGrouperHelper.createGrouperAccumulatorPair( | |||
query, | |||
true, | |||
true, //todo: samarth think about this attribute. I think this might be false in our case? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure, but this comment might help.
If isInputRaw is true, transformations such as timestamp truncation and extraction functions have not
been applied to the input rows yet, for example, in a nested query, if an extraction function is being
applied in the outer query to a field of the inner query. This method must apply those transformations.
@@ -102,6 +102,7 @@ public static Builder builder() | |||
|
|||
private final boolean applyLimitPushDown; | |||
private final Function<Sequence<Row>, Sequence<Row>> postProcessingFn; | |||
private final GroupByQuery pushedDownQuery; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure this should be kept. When is this variable used?
Thanks for the feedback @jihoonson. I will address it in my next commit. Can you point me out as to how should I go about writing tests for this feature? I looked at GroupByQueryRunnerTest and I couldn't find an easy way of mocking the push down behavior. I essentially need a way to mimic that the nested query is getting executed on the historical nodes, and then the outer most query is operating on the results obtained after merging the results returned by the historicals. Would it make sense to instead use Druid SQL and write tests in something like CalciteQueryTest? |
@samarthjain I think you can check out |
@jon-wei thanks. It looks a good example to test merging. |
Can this PR include more description in the master comment about what it is accomplishing and how it is accomplishing it? |
@jihoonson - I have updated pull request with review comments, did some clean up and added tests. |
The way nested query execution is implemented today, it executes the inner most query on the historical nodes with the outer queries being executed on the broker node. This can get problematic when the inner query groups using a high cardinality dimension, returning too many records for broker node to handle. One of the options that we have been internally testing and exploring is the capability to push down the complete nested query to the historical nodes. Each historical node then will execute the nested query primarily dealing with the segments it owns for that query. Because the number of records returned by each historical node would potentially be much smaller in this case, it would be less intensive for the broker to perform the final merge and aggregation. The broker though won't need to perform any more dimension or segment level filtering since it will be taken care of at the historical nodes itself. Note that this way of distributing the aggregation to the historical nodes doesn't always return the same results as the final aggregation getting done on the broker node. However, there is a good set of cases (for ex - aggregating on dimensions that are used for hashing during ingestion) where this kind of push down logic will return the right results. I can get into this into more detail but the general idea was to leave the onus on the user to figure out if their data layout allows for this kind of push down. This implementation provides user a way of forcing nested query execution through a query context variable. The next cut will focus on doing this automatically under the following conditions (credit to @gianm ) for clearly articulating the following:
and either: 2a) A time chunk uses HashBasedNumberedShardSpec, partitionDimensions is nonempty, the grouping dimension set contains all of the shard partitionDimensions, and there are no "extension" partitions (partitions with partitionNum >= partitions, which are created by ingest tasks that append data) or: 2b) A time chunk uses SingleDimensionShardSpec and the grouping dimension set contains the shard dimension. If Druid detects this it should push down the query automatically for that time chunk. There will be situations where the query can be pushed down for some time chunks but not others (for example: consider a data pipeline that loads data unpartitioned in realtime, and later has a reindexing job to partition historical data by some useful dimension). In this case, ideally the broker should be capable of pushing down the query for the time chunks where it can be correctly pushed down, and not pushing it down for others. |
@samarthjain thanks. I'll take another look. |
@samarthjain we are suffering from pretty frequent CI failures. If the CI for your PR fails, you first need to make sure the failure relates to any changes in your PR. This can be checked by running unit tests locally. Usually the memory configuration for CI shouldn't be a problem. If you found something wrong in the CI configuration, please make another PR to fix it. |
@jihoonson - sorry for the late reply. I have cleaned up code, removed pom changes and added an integration test. I wanted to check with you how is the integration test triggered? How is the data needed for the test loaded? I looked at existing integration tests and couldn't figure out where that magic happens. |
Looks like the framework is not able to locate wikipedia data source which the new test needs. @jihoonson, the test passes for me locally after loading up the wikipedia datasource as mentioned on http://druid.io/docs/latest/tutorials/quickstart.html. Would be great if you can let me know how to get load new data for the IT tests. |
@jihoonson - any insights on how to add and execute new integration tests for CI? |
@samarthjain I'll take a look today. Sorry for late review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@samarthjain thanks for adding tests especially an integration test! I left some comments.
Besides, I have three more comments for overall implementation design.
- If I understand correctly, once
forceNestedQueryPushDown
is set to true, the inner most query is passed fromGroupByQueryQueryToolChest
toDirectDruidClient
to find which historicals should receive the query, and then that the pushed down nested query is sent fromDirectDruidClient
. I think this kind of value hijacking can cause many potential bugs. Instead, I suggest to add an interface to Query to find the dataSource of the inner-most query. - You added a flag like
wasQueryPushedDown
to existing methods to reuse them. As a result, some methods have too many functionalities and I think it makes difficult to track down the query execution path. For example,GroupByRowProcessor.process()
is supposed to be called only when processing sub queries, but now it looks to be called when merging sub queries as well. I think it was for avoiding code duplication, but it can be improved by splitting them and extracting common code paths into utility methods. - A groupBy v2 query requires to get a resource before query execution. We currently check there are enough number of merge buffers to execute a given query. Since historicals have processed only the inner-most query so far, they have always required a single merge buffer. However, they can process nested queries after this patch, resource preservation should be changed as well. Please see here for how the number of merge buffers is calculated for brokers.
@@ -240,7 +240,7 @@ | |||
} | |||
} | |||
|
|||
static <T> boolean parseBoolean(Query<T> query, String key, boolean defaultValue) | |||
public static <T> boolean parseBoolean(Query<T> query, String key, boolean defaultValue) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't supposed to be used outside of this class. Please consider #5471 (comment).
new Callable<Boolean>() | ||
{ | ||
@Override | ||
public Boolean call() throws Exception |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exception
is not thrown.
{ | ||
return coordinatorClient.areSegmentsLoaded(WIKIPEDIA_DATA_SOURCE); | ||
} | ||
}, "wikipedia segment load" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please break the line before "wikipedia segment load"
.
{ | ||
// ensure that wikipedia segments are loaded completely | ||
RetryUtil.retryUntilTrue( | ||
new Callable<Boolean>() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this Callable
can be simplified into () -> coordinatorClient.areSegmentsLoaded(WIKIPEDIA_DATA_SOURCE)
.
import io.druid.segment.DimensionSelector; | ||
import io.druid.segment.column.ValueType; | ||
|
||
public class PushDownQueryDimensionSpec implements DimensionSpec |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you elaborate on how this class is used? Looks like it does nothing except forwarding request to the underlying delegate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The dimension name for this dimension spec is the the output name of the delegate. This dimension spec is only used in case of nested query push down.
@Override public String getDimension() { // the dimension name is same as the output name. return delegate.getOutputName(); }
).withOverriddenContext( | ||
ImmutableMap.<String, Object>of( | ||
// the having spec shouldn't be passed down, so we need to convey the existing limit push down status | ||
GroupByQueryConfig.CTX_KEY_APPLY_LIMIT_PUSH_DOWN, query.isApplyLimitPushDown(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you elaborate more on this? Why is query's limitPushDown
passed even though having
spec is always not passed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually copied this code/comment from https://github.com/druid-io/druid/blob/master/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java#L251.
subqueryContext.putAll(((QueryDataSource) dataSource).getQuery().getContext()); | ||
pushDownQuery = groupByStrategy.supportsNestedQueryPushDown() && shouldPushDownQuery(query); | ||
if (pushDownQuery) { | ||
return getResultsOnPushDown(groupByStrategy, query, resource, runner, context); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO, this kind of return
statement in the middle of a method might make readers difficult to understand. Suggest to split this method into several smaller methods.
@@ -108,4 +108,9 @@ public GroupByQueryRunner(Segment segment, final GroupByStrategySelector strateg | |||
return strategySelector.strategize((GroupByQuery) query).process((GroupByQuery) query, adapter); | |||
} | |||
} | |||
|
|||
public GroupByStrategySelector getStrategySelector() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please annotate with @VisibleForTesting
.
subqueryResult, | ||
subquery | ||
); | ||
return groupByStrategy.processSubqueryResult(newSubquery, query, resource, finalizingResults, pushDownQuery); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pushDownQuery
is always false.
final GroupByQueryConfig querySpecificConfig = config.withOverrides(query); | ||
|
||
final AggregatorFactory[] aggregatorFactories = new AggregatorFactory[query.getAggregatorSpecs().size()]; | ||
for (int i = 0; i < query.getAggregatorSpecs().size(); i++) { | ||
aggregatorFactories[i] = query.getAggregatorSpecs().get(i); | ||
AggregatorFactory af = query.getAggregatorSpecs().get(i); | ||
aggregatorFactories[i] = wasQueryPushedDown ? af.getCombiningFactory() : af; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the combining factory used here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case of nested query push down, the historicals are going to return aggregated results and the broker's job is to combine these aggregated results - hence the use of combining aggregator factory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
Can we this kind of query rewriting for dimensions and aggregatorFactories in GroupByQueryQueryToolChect.getResultsOnPushDown()
? It would be better because we can put necessary stuffs for nestedQueryPushDown together when isNestedQueryPushDown()
is checked.
The integration test fails in my laptop as well. Also, when I tested the below query with my local cluster, I met this error.
|
Thanks for the detailed feedback, @jihoonson . I will review it and reply by tonight. |
The process method isn't doing any additional merges. The only change is to make it cognizant of the fact that the sub-query sequence it is processing in case of forcePushDown is now a result of running the pushed down nested query instead.
Good point. I will make sure that historicals have same number of merge buffers as broker. Although from the first look at the code, it seems like it it should, but I will make sure and hopefully add a test for the same too.
Thanks. Let me see how can I make this cleaner. I am not the biggest fan of the approach I took, either. |
Did you update the jar on the historicals too? |
@samarthjain yes, I tested in my local laptop and used the jar of the same version for all modules. |
Addressed the review comments you left, @jihoonson . I was able to get rid of the hacky approach I took for getting hold of the complete nested query in DirectDruidClient by using it directly higher in the stack in GroupByQueryToolChest and GroupByStrategyV2. Also fixed the failure you ran into when using a multi-level nested query. To run the integration test on my laptop, I had to add a config file and supply it's path in the ConfigFileConfigProvider. The contents of the config file looked like this: I also had to change IntegrationTestingConfigProvider to use the ConfigFileConfigProvider and not DockerConfigProvider. @nishantmonu51 - would you happen to know how can I get my newly added integration test to run automatically in the pre-checkin builds? |
@samarthjain thanks for the update! I'll take another look.
Would you elaborate on why this configuration is needed? It's supposed to get addresses automatically in the integration tests.
What do you mean by this? If you add an integration test, it should be run automatically on the travis. You can check it by checking the log on travis or running the integration test on your own. |
Also, please check the CI failure.
|
Not sure why the travis build is complaining. Seems to be building fine locally. @jihoonson - is there a way for you to retry the build? |
Ok, looks like there was a rebase issue with my branch. @jihoonson - would be awesome if you can take a look when you get a chance. |
@jihoonson - I see that a 0.13.0 release is being planned. It would be great if this feature can be added to the 0.13.0 release. |
Hi @samarthjain! Thank you for updating this PR. I'll take a look soon, probably at least in next week. Unfortunately, the feature for 0.13.0 is frozen, but I'm scheduling this to 0.13.1. Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@samarthjain I left probably my last comments. Please take a look. Thanks!
final AggregateResult retVal = filteredSequence.accumulate(AggregateResult.ok(), accumulator); | ||
final AggregateResult retVal = wasQueryPushedDown | ||
? rows.accumulate(AggregateResult.ok(), accumulator) | ||
: filteredSequence.accumulate(AggregateResult.ok(), accumulator); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this because the filters are already processed when wasQueryPushedDown
is true? Then, please add a comment about it here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I think we can clarify this part a bit more. For example, all filter-related variables like filteredSequence
, filterMatcher
, filter
, etc don't have to be initialized. Would you please export this part as a separate method and skip calling it if wasQueryPushedDown
is true?
); | ||
GroupByQuery rewrittenQuery = ((GroupByQueryQueryToolChest) toolChest).rewriteNestedQueryForPushDown(nestedQuery); | ||
// Broker executes this code and hence has | ||
return strategy.applyPostProcessing(strategy.processSubqueryResult( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we call GroupByQueryQueryToolChest.mergeResults()
instead of calling this directly? I think it would be better because the unit test would execute the same logic with the production code.
Thanks for your patient reviews, @jihoonson. I have updated the PR with the requested changes. Travis build seems to be timing out. |
@jihoonson - does this look good to merge now? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@samarthjain thanks for the fix! I restarted Travis. +1 after CI.
CI looks good. Thanks for restarting it and all your reviews, @jihoonson. Let me know if you need to me to squash the commits. |
@samarthjain nice! You don't have to squash commits in most cases. I'll merge this shortly. |
Assert.assertEquals(outputNameAgg, rewrittenQuery.getAggregatorSpecs().get(0).getName()); | ||
} | ||
|
||
private List<QueryRunner<Row>> getRunner1() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unused method
return runners; | ||
} | ||
|
||
private List<QueryRunner<Row>> getRunner2() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unused method
Builds on apache#9111 and implements the datasource analysis mentioned in apache#8728. Still can't handle join datasources, but we're a step closer. Join-related DataSource types: 1) Add "join", "lookup", and "inline" datasources. 2) Add "getChildren" and "withChildren" methods to DataSource, which will be used in the future for query rewriting (e.g. inlining of subqueries). DataSource analysis functionality: 1) Add DataSourceAnalysis class, which breaks down datasources into three components: outer queries, a base datasource (left-most of the highest level left-leaning join tree), and other joined-in leaf datasources (the right-hand branches of the left-leaning join tree). 2) Add "isConcrete", "isGlobal", and "isCacheable" methods to DataSource in order to support analysis. 3) Use the DataSourceAnalysis methods throughout the query handling stack, replacing various ad-hoc approaches. Most of the interesting changes are in ClientQuerySegmentWalker (brokers), ServerManager (historicals), and SinkQuerySegmentWalker (indexing tasks). Other notes: 1) Changed TimelineServerView to return an Optional timeline, which I thought made the analysis changes cleaner to implement. 2) Renamed DataSource#getNames to DataSource#getTableNames, which I think is clearer. Also, made it a Set, so implementations don't need to worry about duplicates. 3) Added QueryToolChest#canPerformSubquery, which is now used by query entry points to determine whether it is safe to pass a subquery dataSource to the query toolchest. Fixes an issue introduced in apache#5471 where subqueries under non-groupBy-typed queries were silently ignored, since neither the query entry point nor the toolchest did anything special with them. 4) The addition of "isCacheable" should work around apache#8713, since UnionDataSource now returns false for cacheability.
Builds on apache#9235, using the datasource analysis functionality to replace various ad-hoc approaches. The most interesting changes are in ClientQuerySegmentWalker (brokers), ServerManager (historicals), and SinkQuerySegmentWalker (indexing tasks). Other changes related to improving how we analyze queries: 1) Changes TimelineServerView to return an Optional timeline, which I thought made the analysis changes cleaner to implement. 2) Added QueryToolChest#canPerformSubquery, which is now used by query entry points to determine whether it is safe to pass a subquery dataSource to the query toolchest. Fixes an issue introduced in apache#5471 where subqueries under non-groupBy-typed queries were silently ignored, since neither the query entry point nor the toolchest did anything special with them. 3) Removes the QueryPlus.withQuerySegmentSpec method, which was mostly being used in error-prone ways (ignoring any potential subqueries, and not verifying that the underlying data source is actually a table). Replaces with a new function, Queries.withSpecificSegments, that includes sanity checks.
Builds on #9235, using the datasource analysis functionality to replace various ad-hoc approaches. The most interesting changes are in ClientQuerySegmentWalker (brokers), ServerManager (historicals), and SinkQuerySegmentWalker (indexing tasks). Other changes related to improving how we analyze queries: 1) Changes TimelineServerView to return an Optional timeline, which I thought made the analysis changes cleaner to implement. 2) Added QueryToolChest#canPerformSubquery, which is now used by query entry points to determine whether it is safe to pass a subquery dataSource to the query toolchest. Fixes an issue introduced in #5471 where subqueries under non-groupBy-typed queries were silently ignored, since neither the query entry point nor the toolchest did anything special with them. 3) Removes the QueryPlus.withQuerySegmentSpec method, which was mostly being used in error-prone ways (ignoring any potential subqueries, and not verifying that the underlying data source is actually a table). Replaces with a new function, Queries.withSpecificSegments, that includes sanity checks.
@gianm @jihoonson - creating a PR as we discussed over the email. The pull request isn't complete by any means. I just wanted to get your feedback on the approach I have taken. For now, I have implemented the force push down feature. Once we have finalized on the over all approach, I will provide a patch for automatic push down in a separate patch. Thanks!