-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix deadlock that can occur while merging group by results #15420
Fix deadlock that can occur while merging group by results #15420
Conversation
...ing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java
Fixed
Show fixed
Hide fixed
...ing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java
Fixed
Show fixed
Hide fixed
...ing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java
Fixed
Show fixed
Hide fixed
...ing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java
Fixed
Show fixed
Hide fixed
...ing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java
Fixed
Show fixed
Hide fixed
...ing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java
Fixed
Show fixed
Hide fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let me try to sketch an alternate approach/idea I had while looking thru this PR:
what we have now:
- pool of global resources
- allocated 1-by-1
probable alternative:
- ask for a sub-pool from the global pool
- ask for buffers from that pool ( will return non-committed holders )
- 1-time allocation:
- have a method on the sub-pool to make all buffers live (like earlier)
- can be called multiple times (at every place the stuff is used) but only the first allocates (and locks)
- on the 1st call to a holder which is dormant all resources are allocated
- have a method on the sub-pool to make all buffers live (like earlier)
- either way: if the sub-pool have gone live it should not anymore return usable holders
something like:
- the
numMergeBuffers
inGroupByMergingQueryRunnerV2
can be calculated earlier inrun()
before the return; - the mergebuffer holders can be requested there (no backing commitment)
- before usage of the actual mergebuffers do a
lock
just before the usage of the stuff starts - GroupingEngine part could be similar...not sure
mergeBufferHolder = mergeBufferPool.takeBatch(numBuffers); | ||
} | ||
return mergeBufferHolder; | ||
GroupByQueryResources resource = (GroupByQueryResources) responseContext.get(GroupByUtils.RESPONSE_KEY_GROUP_BY_MERGING_QUERY_RUNNER_BUFFERS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like responseContext
is used as a backplane here...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@LakshSingla explained that this is a role this class could fill in the future (thx :) )
final int requiredMergeBufferNumForMergingQueryRunner = | ||
usesGroupByMergingQueryRunner | ||
? GroupByQueryResources.countRequiredMergeBufferNumForMergingQueryRunner(groupByQueryConfig, query) | ||
: 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find it a pretty odd contract that it gets counted here and used somewhere
I think decoupling things like this could increase complexity significantly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it is pretty bad. What's also unsettling is that we now wanna make sure that GroupByQueryMergingRunnerV2 expects the merge buffer from someone above in the chain, which enforces a contract b/w two different query runners, with only a generic responseContext that holds them.
Merge buffers do need a redesign, and I hope the changes introduced in the patch become obsolete.
I've experimented a bit with the above stuff - and although it could work - its not that much different; it needed a different refactor to be done... |
...src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java
Fixed
Show fixed
Hide fixed
...src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java
Fixed
Show fixed
Hide fixed
@LakshSingla TY for the patch! Would you mind leaving an explanation in a comment here about how the fix works? Like:
It'll aid in reviewing, and aid anyone that comes back later to try to read about the logic. |
One thing I'm wondering is what we need |
Curious when do we push down a subquery to historical? Where does that logic sit? |
I think @LakshSingla is talking about |
Yes @abhishekagarwal87, if the flag is set, then the outer query along with the subquery is sent to the historicals. Else, only the subquery is sent to the historical and the broker does the processing on top of the subquery's results. @gianm I'll try to add the comments in the code itself, making it more self-explanatory. |
Adding a comment here as well to aid in the code walkthrough, and for anyone revisiting the PR: The number of merge buffers required to execute a group by query is calculated based on the structure of the query. There are many levels of merging that can happen. Per my understanding, a raw segment gets aggregated after passing through the following structure:
Steps 1-3 happen on the data servers, while 4-5 happen on the brokers. The above was an idealistic world, where there was no nested call between the mergeResults and the mergeRunners, therefore there was a single place where the merge buffer can be acquired. However, there are two esoteric cases when this would not be true:
Therefore, in places where there's a nested call stack like mergeResults(.....mergeRunners(....)), the code acquires merge buffers in two places. This is true in:
The only place where we don't have a nested call stack is when the Broker merges the results from the historicals, wherein the mergeRunners i called by the historicals, and the mergeResults is called on the "combined" version of the runner returned by the historicals. |
To disambiguate between these cases, there's a flag called In the new code, the
@gianm I can probably change the context parameter to be more generic and true for all the queries (CTX_KEY_USES_MERGE_RUNNERS) , however actionable only in the group by toolchest. |
Regarding acquisition and freeing up the merge buffers, the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still thinking on this PR, will have another pass soon
public static final String CTX_KEY_MERGE_RUNNERS_USING_CHAINED_EXECUTION = "mergeRunnersUsingChainedExecution"; | ||
|
||
/** | ||
* | ||
*/ | ||
public static final String CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2 = "runnerMergesUsingGroupByMergingQueryRunnerV2"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there are a bunch of query context group by keys in GroupByQuery
, these should probably live there too instead of adding a new class to hold them. Alternatively, I guess they could have just stayed in GroupByMergingQueryRunnerV2 since they seem kind of specific to its function.
Also please drop the "V2" from this flag name since there is only v2 now
/** | ||
* Reason for using this is to ensure that we donot set the merge buffers multiple times on the same response context | ||
*/ | ||
public static final ResponseContext.Key RESPONSE_KEY_GROUP_BY_MERGING_QUERY_RUNNER_BUFFERS = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this could just live in GroupingEngine
or somewhere else if we move the query context parameters to GroupByQuery
(or I suppose could also live with the query context parameters in GroupByQuery
)
QueryPlus<T> queryPlus1 = queryPlus.withQuery( | ||
queryPlus.getQuery().withOverriddenContext( | ||
ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, true) | ||
) | ||
); | ||
final ClusterQueryResult<T> result = new SpecificQueryRunnable<>(queryPlus1, responseContext) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems off to apply it to all query types, when it only affects group by queries. Also, it doesn't really seem quite correct either, since if CTX_KEY_MERGE_RUNNERS_USING_CHAINED_EXECUTION
is set, or if 'by segment' is set, mergeRunners does not take a merge buffer.
Query<T> queryToRun = newQuery.withOverriddenContext( | ||
ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, false) | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this set to false here, and then inside of the cluster walker (which is CachingClusteredClient) it is immediately set to true?
Also, shouldn't there be something for the local walker? The local walker is the only thing on the broker that calls 'mergeRunners' to use the group by merging query runner... otherwise the broker does not use it for any other query shape afaict.
I'm hoping we can fix the bug without making the code too much more complex. There's some things that contribute to the complexity and I'm hoping we can do some simpler alternative:
As an alternative to the context keys, maybe we can either add a parameter to As to the response context, I don't really understand why we're cloning it. What's the reason for that? In any case, a possible alternative could be to put a unique key in the query context (just a string) and then have the various parts of the query use that key to get their resources from something that's injected (like the merge buffer pool is currently injected). Or, an alternative could be to eliminate the cloning and have a requirement that only one Lastly, for testing, maybe I missed it but I didn't see a test case for the Broker-side scenario. The following query is the simplest one I could come up with that exhibits the problem. Perhaps {
"queryType": "groupBy",
"dataSource": {
"type": "query",
"query": {
"queryType": "groupBy",
"dataSource": {
"type": "inline",
"columnNames": []
"rows": []
},
"intervals": "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z",
"granularity": "all"
}
},
"intervals": "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z",
"granularity": "all"
} |
I think the use of keys has been exaggerated in my code, and can probably be minimized if we have a unique key set on the context per query, which can be used to reference the already reserved resources from a globally available pool (with some reservation built-in. With regards to cloning, I don't think there's a real need for it, apart from ensuring that the callers up the stack of query runners don't get a contaminated pool, which contains keys set by the children. I was envisioning this would be useful in case when there are multiple to the mergeResults->mergeRunners called by a parent runner, being called in parallel. The child runner won't have a chance to clean up the response context object, while it gets passed to the other runner. Regarding the above comments, I have come up with the following alternative:
GroupByResourcesReservationPool(){
public void reserve(String uniqueId, int numMergeBuffers)
public void reserve(String uniqueId, int numMergeBuffers, long timeoutMs)
public GroupByQueryResources take(String uniqueId)
public GroupByQueryResources cleanup(String uniqueId)
} |
@LakshSingla is this ready for another review or are some other changes pending? |
@gianm I am working on fixing up the ITs and the UTs due to the changed semantics of resource passing. |
@gianm With the recent changes made to the PR, I wanted to revisit the questions posed here, and more, that will help in reviewing. Assumptions
These are true to my knowledge at the time of the PR (and they should remain true unless there's some whacky change in the query stack). Also, these assumptions need to be more valid for group-by queries, because only they require shared resources. Rest all queries don't, and they don't rely on any of these assumptions being correct. Resource ID
Tests modifications Merge buffer allocation The call to allocate the merge buffers in the pool is done by Once the required merge buffers are allocated from the pool, they cannot be used by the other queries till the |
@gianm The PR is ready for review now. There are a few code coverage failures, but I think they are due to making changes in the unrelated classes using the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is definitely cleaner than the prior approach. In the review comments I had some suggestions about making things clearer.
@@ -90,9 +90,9 @@ public FluentQueryRunner<T> postProcess(PostProcessingOperator<T> postProcessing | |||
return from(postProcessing != null ? postProcessing.postProcess(baseRunner) : baseRunner); | |||
} | |||
|
|||
public FluentQueryRunner<T> mergeResults() | |||
public FluentQueryRunner<T> mergeResults(boolean willMergeRunner) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add javadoc for what willMergeRunner
means. I recognize most other stuff in here doesn't have javadocs, but, still.
@@ -591,6 +591,10 @@ public boolean isWindowingStrictValidation() | |||
); | |||
} | |||
|
|||
public String getQueryResourceId() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO it'd be good to make this a wrapper around String like ResourceId
. That provides us a central place to put some javadocs that explain how resources work, and link with @link
and @see
to other relevant files. It also makes it easier to find usages in an IDE.
* determine if the mergeResults should acquire those resources for the merging runners, before beginning execution. | ||
* If not overridden, this method will ignore the {@code willMergeRunner} parameter. | ||
* | ||
* Ideally {@link #mergeResults(QueryRunner)} should have delegated to this method after setting the default value of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple questions about this:
- what should the "default value of
willMergeRunner
" be? It seems like a bunch of places usetrue
, but why is that? - what should new toolchests do? Is it ok to override just the new 2-arg
mergeResults
call, or do both need to be overridden?
I'm hoping we can make this willMergeRunner
piece more clear, since IMO it's the main unclear thing left in the patch after the last round of changes.
} | ||
|
||
@Inject | ||
public GroupByQueryQueryToolChest( | ||
GroupingEngine groupingEngine, | ||
Supplier<GroupByQueryConfig> queryConfigSupplier, | ||
GroupByQueryMetricsFactory queryMetricsFactory | ||
GroupByQueryMetricsFactory queryMetricsFactory, | ||
@Merging GroupByResourcesReservationPool groupByResourcesReservationPool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I understand it— there is no reason to use @Merging
here, since there's only one kind of GroupByResourcesReservationPool
. (The annotations are used to disambiguate when there's multiple kinds of some injectable key.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, I reasoned that @Merging
was used to annotate the global resources associated with the group by merging therefore I annotated the resource pool with that, and it doesn't have any functional relevance. I am fine with removing it as well.
server/src/main/java/org/apache/druid/server/ResourceIdPopulatingQueryRunner.java
Outdated
Show resolved
Hide resolved
processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java
Outdated
Show resolved
Hide resolved
/** | ||
* Map of query's resource id -> group by resources reserved for the query to execute | ||
*/ | ||
final ConcurrentHashMap<String, GroupByQueryResources> pool = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use ResourceId
as key once it exists
@gianm |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had a minor note but otherwise LGTM. Feel free to merge when tests are passing.
|
||
@Override | ||
public String toString() | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simpler, and will make interpolations nicer, to just do return queryResourceId;
here.
The coverage checks are failing, but due to the refactoring done on unrelated classes - DumpSegment, MaterializedVeiwQueryToolchest. There's also one Jacoco failure, which I think is due to the code coverage check itself. |
Thanks for the reviews @gianm @kgyrtkirk @clintropolis |
Description
When trying to merge the higher order group by results on broker (primarily), and perhaps on historicals (under special conditions when we push the subquery down to the historical), a deadlock can occur depending on the structure of the query.
The primary cause of this deadlock is that currently, we are trying to acquire the merge buffer resources at two places -
GroupByQueryQueryToolchest
(which is the place where we merge the higher-level results from the runners), and atGroupByMergingQueryRunnerV2
(to merge the results from the runners of the multiple segments). These two were incorrectly considered to be mutually exclusive till now:GroupByQueryQueryToolchest
- Acquires the merge buffers on the brokers only, as the historicals get the query wit the subqueries and the grouping sets to null, therefore this doesn't acquire any merge buffers. However, this might not be true, because when we push down the subquery itself, the historicals will also acquire the merge buffers at this place.GroupByMergingQueryRunnerV2
- Acquires merge buffers on the historicals only, as it primarily merges the results from the query runners that run on the segments. This is also incorrect because the broker can act as a data server itself when the data source is an inline data source, and we attempt to run nested group bys on that.Therefore, we have conditions when the
GroupByQueryQueryToolchest
is holding some merge buffers to merge the results of the returned runners, however, the runner itself isGroupByMergingQueryRunnerV2
(or a decorated runner on top of it), and it also requires merge buffers to merge the "segment-level" runners to provide to the mergeResults of the toolchest. As we donot acquire the resources in a single go, following situations happen:Total merge buffers on the broker: 2
QueryA = QueryB = A query that needs 1 merge buffer to
merge results
(in toolchest), and 1 mergeBuffer forGroupByMergingQueryRunnerV2
on the brokerQueryA & QueryB are running simultaneously on the cluster
The queries could have passed in isolation, however now they are waiting on the other to release the single merge buffer they hold to proceed.
This PR prevents such a deadlock from happening by acquiring the merge buffers in a single place and passing it down to the runner that might need it.
Release note
Key changed/added classes in this PR
MyFoo
OurBar
TheirBaz
This PR has: