Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix another deadlock which can occur while acquiring merge buffers #16372

Merged
merged 7 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import javax.inject.Inject;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;

/**
* Reserves the {@link GroupByQueryResources} for a given group by query and maps them to the query's resource ID.
Expand Down Expand Up @@ -67,21 +68,25 @@
* nested ones execute via an unoptimized way.
* 3. There's some knowledge to the mergeResults that the query runner passed to it is the one created by the corresponding toolchest's
* mergeRunners (which is the typical use case). This is encoded in the argument {@code willMergeRunner}, and is to be set by the callers.
* The only production use case where this isn't true is when the broker is merging the results gathered from the historical)
* The only production use case where this isn't true is when the broker is merging the results gathered from the historical
* <p>
* TESTING
* Unit tests mimic the broker-historical interaction in many places, which can lead to the code not working as intended because the assumptions don't hold.
* In many test cases, there are two nested mergeResults calls, the outer call mimics what the broker does, while the inner one mimics what the historical does,
* and the assumption (1) fails. Therefore, the testing code should assign a unique resource id b/w each mergeResults call, and also make sure that the top level mergeResults
* would have willMergeRunner = false, since it's being called on top of a mergeResults's runner, while the inner one would have willMergeRunner = true because its being
* called on actual runners (as it happens in the brokers, and the historicals)
* called on actual runners (as it happens in the brokers, and the historicals).
* <p>
* There is a test in GroupByResourcesReservationPoolTest that checks for deadlocks when the operations are interleaved in a
* certain maanner. It is ignored because it sleeps and can increase time when the test suite is run. Developers making any changes
* to this class, or a related class should manually verify that all the tests in the test class are running as expected.
*/
public class GroupByResourcesReservationPool
{
/**
* Map of query's resource id -> group by resources reserved for the query to execute
*/
final ConcurrentHashMap<QueryResourceId, GroupByQueryResources> pool = new ConcurrentHashMap<>();
final ConcurrentHashMap<QueryResourceId, AtomicReference<GroupByQueryResources>> pool = new ConcurrentHashMap<>();

/**
* Buffer pool from where the merge buffers are picked and reserved
Expand All @@ -104,19 +109,42 @@
}

/**
* Reserves appropriate resources, and maps it to the queryResourceId (usually the query's resource id) in the internal map
* Reserves appropriate resources, and maps it to the queryResourceId (usually the query's resource id) in the internal map.
* This is a blocking call, and can block up to the given query's timeout
*/
public void reserve(QueryResourceId queryResourceId, GroupByQuery groupByQuery, boolean willMergeRunner)
{
if (queryResourceId == null) {
throw DruidException.defensive("Query resource id must be populated");
}
pool.compute(queryResourceId, (id, existingResource) -> {
if (existingResource != null) {
throw DruidException.defensive("Resource with the given identifier [%s] is already present", id);
}
return GroupingEngine.prepareResource(groupByQuery, mergeBufferPool, willMergeRunner, groupByQueryConfig);
});

// First check if the query resource id is present in the map, and if not, populate a dummy reference. This will
// block other threads from populating the map with the same query id, and is essentially same as reserving a spot in
// the map for the given query id. Since the actual allocation of the resource might take longer than expected, we
// do it out of the critical section, once we have "reserved" the spot
AtomicReference<GroupByQueryResources> reference = new AtomicReference<>(null);
AtomicReference<GroupByQueryResources> existingResource = pool.putIfAbsent(queryResourceId, reference);

// Multiple attempts made to allocate the query resource for a given resource id. Throw an exception
//noinspection VariableNotUsedInsideIf
if (existingResource != null) {
throw DruidException.defensive("Resource with the given identifier [%s] is already present", queryResourceId);
}

GroupByQueryResources resources;
try {
// We have reserved a spot in the map. Now begin the blocking call.
resources = GroupingEngine.prepareResource(groupByQuery, mergeBufferPool, willMergeRunner, groupByQueryConfig);
}
catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you change this to Throwable? It isn't likely that we will actually get a Throwable here that is not an Exception, but IMO it's good practice for "definitely must happen" cleanup-on-failure code to catch Throwable rather than Exception. It makes it before more like a finally or try-with-resources, both of which would activate on any Throwable.

// Unable to allocate the resources, perform cleanup and rethrow the exception
pool.remove(queryResourceId);
throw e;
}

// Resources have been allocated, spot has been reserved. The reference would ALWAYS refer to 'null'. Refer the
// allocated resources from it
reference.compareAndSet(null, resources);
}

/**
Expand All @@ -125,17 +153,35 @@
@Nullable
public GroupByQueryResources fetch(QueryResourceId queryResourceId)
{
return pool.get(queryResourceId);
AtomicReference<GroupByQueryResources> resourcesReference = pool.get(queryResourceId);
if (resourcesReference == null) {
// There weren't any resources allocated corresponding to the provided resource id
return null;
}
GroupByQueryResources resource = resourcesReference.get();
if (resource == null) {
throw DruidException.defensive(
"Query id [%s] had a non-null reference in the resource reservation pool, but no resources were found"
);
Fixed Show fixed Hide fixed
}
return resource;
}

/**
* Removes the entry corresponding to the unique id from the map, and cleans up the resources.
*/
public void clean(QueryResourceId queryResourceId)
{
GroupByQueryResources resources = pool.remove(queryResourceId);
if (resources != null) {
resources.close();
AtomicReference<GroupByQueryResources> resourcesReference = pool.remove(queryResourceId);
if (resourcesReference != null) {
GroupByQueryResources resource = resourcesReference.get();
// Reference should refer to a non-empty resource
if (resource == null) {
throw DruidException.defensive(
"Query id [%s] had a non-null reference in the resource reservation pool, but no resources were found"
);
Fixed Show fixed Hide fixed
}
resource.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ public GroupingEngine(
* {@link GroupByMergingQueryRunner} for a particular query. The resources are to be acquired once throughout the
* execution of the query, or need to be re-acquired (if needed). Users must ensure that throughout the execution,
* a query already holding the resources shouldn't request for more resources, because that can cause deadlocks.
* <p>
* This method throws an exception if it is not able to allocate sufficient resources required for the query to succeed
*/
public static GroupByQueryResources prepareResource(
GroupByQuery query,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query.groupby;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.collections.BlockingPool;
import org.apache.druid.collections.DefaultBlockingPool;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.QueryResourceId;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;

import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;

public class GroupByResourcesReservationPoolTest
{

/**
* CONFIG + QUERY require exactly 1 merge buffer to succeed if 'willMergeRunners' is true while allocating the resources
*/
private static final GroupByQueryConfig CONFIG = new GroupByQueryConfig();
private static final GroupByQuery QUERY = GroupByQuery.builder()
.setInterval(Intervals.ETERNITY)
.setDataSource("foo")
.setDimensions(
ImmutableList.of(
new DefaultDimensionSpec("dim2", "_d0")
)
)
.setGranularity(Granularities.ALL)
.setContext(
ImmutableMap.of("timeout", 0)
) // Query can block indefinitely
.build();

/**
* This test confirms that the interleaved GroupByResourcesReservationPool.reserve() and GroupByResourcesReservationPool.clean()
* between multiple threads succeed. It is specifically designed to test the case when the operations are interleaved in the
* following manner:
* <p>
* THREAD1 THREAD2
* pool.reserve(query1)
* pool.reserve(query2)
* pool.clean(query1)
* <p>
* This test assumes a few things about the implementation of the interfaces, which are laid out in the comments.
* <p>
* The test should complete under 10 seconds, and the majority of the time would be consumed by waiting for the thread
* that sleeps for 5 seconds
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should add new unit tests that have sleeps; the test suite takes long enough to run already. They are also a sign of a test that is not robust.

Is it possible to rewrite this test to not use a sleep? If not, I'd suggest having it be @Ignore so it doesn't run on every test suite run. Then include a comment in the code of GroupByResourcesReservationPool itself that says if a future developer is changing the logic, they should run this test manually to ensure they aren't introducing a deadlock.

Copy link
Contributor Author

@LakshSingla LakshSingla May 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to rewrite this test to not use a sleep?

The problem I am running into at this point is that I want to signal to Thread1 that Thread2 has called the reserve() operation, however, the reserve() itself is blocking. I have tried the polling approach, using synchronized blocks and the current method, however they all run into the same blocker - there's no way to signal from a thread that it has called a blocking operation (before its completion). Any suggestions on how I can achieve this?

Else I'll annotate the test with @Ignore

*/
@Ignore(
"Isn't run as a part of CI since it sleeps for 5 seconds. Callers must run the test manually if any changes are made "
+ "to the corresponding class"
)
@Test(timeout = 100_000L)
public void testInterleavedReserveAndRemove()
{
ExecutorService executor = Execs.multiThreaded(3, "group-by-resources-reservation-pool-test-%d");

// Sanity checks that the query will acquire exactly one merge buffer. This safeguards the test being useful in
// case the merge buffer acquisition code changes to acquire less than one merge buffer (the test would be
// useless in that case) or more than one merge buffer (the test would incorrectly fail in that case)
Assert.assertEquals(
1,
GroupByQueryResources.countRequiredMergeBufferNumForMergingQueryRunner(CONFIG, QUERY)
+ GroupByQueryResources.countRequiredMergeBufferNumForToolchestMerge(QUERY)
);

// Blocking pool with a single buffer, which means only one of the queries can succeed at a time
BlockingPool<ByteBuffer> mergeBufferPool = new DefaultBlockingPool<>(() -> ByteBuffer.allocate(100), 1);
GroupByResourcesReservationPool groupByResourcesReservationPool =
new GroupByResourcesReservationPool(mergeBufferPool, CONFIG);

// Latch indicating that the first thread has called reservationPool.reserve()
CountDownLatch reserveCalledByFirstThread = new CountDownLatch(1);
// Latch indicating that the second thread has called reservationPool.reserve()
CountDownLatch reserveCalledBySecondThread = new CountDownLatch(1);
// Latch indicating that all the threads have been completed successfully. Main thread waits on this latch before exiting
CountDownLatch threadsCompleted = new CountDownLatch(2);

// THREAD 1
executor.submit(() -> {

QueryResourceId queryResourceId1 = new QueryResourceId("test-id-1")
{
@Override
public int hashCode()
{
// IMPORTANT ASSUMPTION: For the test to be useful, it assumes that under the hood we are using a
// ConcurrentHashMap<QueryResourceId, GroupByResources> (or a concurrent map with similar implementation) that
// implements granular locking of the nodes
// The hashCode of the queryResourceId used in Thread1 and Thread2 is the same. Therefore, both the queryIds
// would be guarded by the same lock
return 10;
}

@Override
public boolean equals(Object o)
{
return super.equals(o);
}
};
groupByResourcesReservationPool.reserve(queryResourceId1, QUERY, true);
reserveCalledByFirstThread.countDown();
try {
reserveCalledBySecondThread.await();
}
catch (InterruptedException e) {
Assert.fail("Interrupted while waiting for second reserve call to be made");
}
groupByResourcesReservationPool.clean(queryResourceId1);
threadsCompleted.countDown();
});

// THREAD 2
executor.submit(() -> {
try {
reserveCalledByFirstThread.await();
}
catch (InterruptedException e) {
Assert.fail("Interrupted while waiting for first reserve call to be made");
}

QueryResourceId queryResourceId2 = new QueryResourceId("test-id-2")
{
@Override
public int hashCode()
{
return 10;
}

@Override
public boolean equals(Object o)
{
return super.equals(o);
}
};

// Since the reserve() call is blocking, we need to execute it separately, so that we can count down the latch
// and inform Thread1 the reserve call has been made by this thread
executor.submit(() -> {
groupByResourcesReservationPool.reserve(queryResourceId2, QUERY, true);
threadsCompleted.countDown();
});
try {
// This sleep call "ensures" that the statment pool.reserve(queryResourceId2) is called before we release the
// latch (that will cause Thread1 to release the acquired resources). It still doesn't guarantee the previous
// statement, however that's the best we can do, given that reserve() is blocking
Thread.sleep(5_000);
}
catch (InterruptedException e) {
Assert.fail("Interrupted while sleeping");
}
reserveCalledBySecondThread.countDown();
});

try {
threadsCompleted.await();
}
catch (InterruptedException e) {
Assert.fail("Interrupted while waiting for the threads to complete");
}
}

@Test
public void testMultipleSimultaneousAllocationAttemptsFail()
{
BlockingPool<ByteBuffer> mergeBufferPool = new DefaultBlockingPool<>(() -> ByteBuffer.allocate(100), 1);
GroupByResourcesReservationPool groupByResourcesReservationPool =
new GroupByResourcesReservationPool(mergeBufferPool, CONFIG);
QueryResourceId queryResourceId = new QueryResourceId("test-id");

groupByResourcesReservationPool.reserve(queryResourceId, QUERY, true);

Assert.assertThrows(
DruidException.class,
() -> groupByResourcesReservationPool.reserve(queryResourceId, QUERY, true)
);
}

@Test
public void testMultipleSequentialAllocationAttemptsSucceed()
{
BlockingPool<ByteBuffer> mergeBufferPool = new DefaultBlockingPool<>(() -> ByteBuffer.allocate(100), 1);
GroupByResourcesReservationPool groupByResourcesReservationPool =
new GroupByResourcesReservationPool(mergeBufferPool, CONFIG);
QueryResourceId queryResourceId = new QueryResourceId("test-id");

groupByResourcesReservationPool.reserve(queryResourceId, QUERY, true);
GroupByQueryResources oldResources = groupByResourcesReservationPool.fetch(queryResourceId);

// Cleanup the resources
groupByResourcesReservationPool.clean(queryResourceId);

// Repeat the calls
groupByResourcesReservationPool.reserve(queryResourceId, QUERY, true);
GroupByQueryResources newResources = groupByResourcesReservationPool.fetch(queryResourceId);
Assert.assertNotNull(newResources);

Assert.assertNotSame(oldResources, newResources);
}
}
Loading