Skip to content

Commit

Permalink
Fix deadlock that can occur while merging group by results (#15420)
Browse files Browse the repository at this point in the history
This PR prevents such a deadlock from happening by acquiring the merge buffers in a single place and passing it down to the runner that might need it.
  • Loading branch information
LakshSingla authored Apr 22, 2024
1 parent cff5d1e commit b9bbde5
Show file tree
Hide file tree
Showing 60 changed files with 1,427 additions and 490 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryQueryToolChest;
import org.apache.druid.query.groupby.GroupByQueryRunnerFactory;
import org.apache.druid.query.groupby.GroupByResourcesReservationPool;
import org.apache.druid.query.groupby.GroupingEngine;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
Expand Down Expand Up @@ -372,19 +373,21 @@ public String getFormatString()
};

final Supplier<GroupByQueryConfig> configSupplier = Suppliers.ofInstance(config);
final GroupByResourcesReservationPool groupByResourcesReservationPool =
new GroupByResourcesReservationPool(mergePool, config);
final GroupingEngine groupingEngine = new GroupingEngine(
druidProcessingConfig,
configSupplier,
bufferPool,
mergePool,
groupByResourcesReservationPool,
TestHelper.makeJsonMapper(),
new ObjectMapper(new SmileFactory()),
QueryBenchmarkUtil.NOOP_QUERYWATCHER
);

factory = new GroupByQueryRunnerFactory(
groupingEngine,
new GroupByQueryQueryToolChest(groupingEngine)
new GroupByQueryQueryToolChest(groupingEngine, groupByResourcesReservationPool)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.apache.druid.query.groupby.GroupByQueryQueryToolChest;
import org.apache.druid.query.groupby.GroupByQueryRunnerFactory;
import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
import org.apache.druid.query.groupby.GroupByResourcesReservationPool;
import org.apache.druid.query.groupby.GroupingEngine;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.planning.DataSourceAnalysis;
Expand Down Expand Up @@ -356,16 +357,18 @@ private static GroupByQueryRunnerFactory makeGroupByQueryRunnerFactory(
bufferSupplier,
processingConfig.getNumMergeBuffers()
);
final GroupByResourcesReservationPool groupByResourcesReservationPool =
new GroupByResourcesReservationPool(mergeBufferPool, config);
final GroupingEngine groupingEngine = new GroupingEngine(
processingConfig,
configSupplier,
bufferPool,
mergeBufferPool,
groupByResourcesReservationPool,
mapper,
mapper,
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest(groupingEngine);
final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest(groupingEngine, groupByResourcesReservationPool);
return new GroupByQueryRunnerFactory(groupingEngine, toolChest);
}

Expand Down Expand Up @@ -469,7 +472,7 @@ private <T> List<T> runQuery()
toolChestWarehouse.getToolChest(query)
)
.applyPreMergeDecoration()
.mergeResults()
.mergeResults(true)
.applyPostMergeDecoration();

//noinspection unchecked
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryQueryToolChest;
import org.apache.druid.query.groupby.GroupByQueryRunnerFactory;
import org.apache.druid.query.groupby.GroupByResourcesReservationPool;
import org.apache.druid.query.groupby.GroupingEngine;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
Expand Down Expand Up @@ -487,19 +488,21 @@ public String getFormatString()
};

final Supplier<GroupByQueryConfig> configSupplier = Suppliers.ofInstance(config);
final GroupByResourcesReservationPool groupByResourcesReservationPool =
new GroupByResourcesReservationPool(mergePool, config);
final GroupingEngine groupingEngine = new GroupingEngine(
druidProcessingConfig,
configSupplier,
bufferPool,
mergePool,
groupByResourcesReservationPool,
TestHelper.makeJsonMapper(),
new ObjectMapper(new SmileFactory()),
QueryBenchmarkUtil.NOOP_QUERYWATCHER
);

factory = new GroupByQueryRunnerFactory(
groupingEngine,
new GroupByQueryQueryToolChest(groupingEngine)
new GroupByQueryQueryToolChest(groupingEngine, groupByResourcesReservationPool)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ public void testGroupByWithDistinctCountAgg() throws Exception
Iterable<ResultRow> results = FluentQueryRunner
.create(factory.createRunner(incrementalIndexSegment), factory.getToolchest())
.applyPreMergeDecoration()
.mergeResults()
.mergeResults(true)
.applyPostMergeDecoration()
.run(QueryPlus.wrap(query))
.run(QueryPlus.wrap(GroupByQueryRunnerTestHelper.populateResourceId(query)))
.toList();

List<ResultRow> expectedResults = Arrays.asList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,35 @@ public MaterializedViewQueryQueryToolChest(
{
this.warehouse = warehouse;
}

@Override
public QueryRunner mergeResults(QueryRunner runner)
{
return new QueryRunner() {
return new QueryRunner()
{
@Override
public Sequence run(QueryPlus queryPlus, ResponseContext responseContext)
{
Query realQuery = getRealQuery(queryPlus.getQuery());
return warehouse.getToolChest(realQuery)
.mergeResults(runner)
.run(queryPlus.withQuery(realQuery), responseContext);
}
};
}

@Override
public QueryRunner mergeResults(QueryRunner runner, boolean willMergeRunner)
{
return new QueryRunner()
{
@Override
public Sequence run(QueryPlus queryPlus, ResponseContext responseContext)
{
Query realQuery = getRealQuery(queryPlus.getQuery());
return warehouse.getToolChest(realQuery).mergeResults(runner).run(queryPlus.withQuery(realQuery), responseContext);
return warehouse.getToolChest(realQuery)
.mergeResults(runner, willMergeRunner)
.run(queryPlus.withQuery(realQuery), responseContext);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public void testDecorateObjectMapper() throws IOException
QueryToolChest queryToolChest =
new MaterializedViewQueryQueryToolChest(new MapQueryToolChestWarehouse(
ImmutableMap.<Class<? extends Query>, QueryToolChest>builder()
.put(GroupByQuery.class, new GroupByQueryQueryToolChest(null))
.put(GroupByQuery.class, new GroupByQueryQueryToolChest(null, null))
.build()
));

Expand Down Expand Up @@ -186,7 +186,7 @@ public void testDecorateObjectMapperMaterializedViewQuery() throws IOException
QueryToolChest materializedViewQueryQueryToolChest =
new MaterializedViewQueryQueryToolChest(new MapQueryToolChestWarehouse(
ImmutableMap.<Class<? extends Query>, QueryToolChest>builder()
.put(GroupByQuery.class, new GroupByQueryQueryToolChest(null))
.put(GroupByQuery.class, new GroupByQueryQueryToolChest(null, null))
.build()
));

Expand Down Expand Up @@ -245,7 +245,7 @@ public void testGetRealQuery()
MaterializedViewQueryQueryToolChest materializedViewQueryQueryToolChest =
new MaterializedViewQueryQueryToolChest(new MapQueryToolChestWarehouse(
ImmutableMap.<Class<? extends Query>, QueryToolChest>builder()
.put(GroupByQuery.class, new GroupByQueryQueryToolChest(null))
.put(GroupByQuery.class, new GroupByQueryQueryToolChest(null, null))
.build()
));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryQueryToolChest;
import org.apache.druid.query.groupby.GroupByQueryRunnerFactory;
import org.apache.druid.query.groupby.GroupByResourcesReservationPool;
import org.apache.druid.query.groupby.GroupingEngine;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
Expand All @@ -67,6 +68,9 @@ public class MapVirtualColumnGroupByTest extends InitializedNullHandlingTest
public void setup() throws IOException
{
final IncrementalIndex incrementalIndex = MapVirtualColumnTestBase.generateIndex();
final GroupByQueryConfig config = new GroupByQueryConfig();
final GroupByResourcesReservationPool groupByResourcesReservationPool =
new GroupByResourcesReservationPool(new DefaultBlockingPool<>(() -> ByteBuffer.allocate(1024), 1), config);
final GroupingEngine groupingEngine = new GroupingEngine(
new DruidProcessingConfig()
{
Expand Down Expand Up @@ -94,17 +98,17 @@ public int getNumThreads()
return 1;
}
},
GroupByQueryConfig::new,
() -> config,
new StupidPool<>("map-virtual-column-groupby-test", () -> ByteBuffer.allocate(1024)),
new DefaultBlockingPool<>(() -> ByteBuffer.allocate(1024), 1),
groupByResourcesReservationPool,
TestHelper.makeJsonMapper(),
new DefaultObjectMapper(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);

final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory(
groupingEngine,
new GroupByQueryQueryToolChest(groupingEngine)
new GroupByQueryQueryToolChest(groupingEngine, groupByResourcesReservationPool)
);

runner = QueryRunnerTestHelper.makeQueryRunner(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ public void testGroupByWithApproximateHistogramAgg()
)
);

Iterable<ResultRow> results = runner.run(QueryPlus.wrap(query)).toList();
Iterable<ResultRow> results = runner.run(QueryPlus.wrap(GroupByQueryRunnerTestHelper.populateResourceId(query)))
.toList();
TestHelper.assertExpectedObjects(expectedResults, results, "approx-histo");
}

Expand Down Expand Up @@ -231,7 +232,8 @@ public void testGroupByWithSameNameComplexPostAgg()
)
);

Iterable<ResultRow> results = runner.run(QueryPlus.wrap(query)).toList();
Iterable<ResultRow> results = runner.run(QueryPlus.wrap(GroupByQueryRunnerTestHelper.populateResourceId(query)))
.toList();
TestHelper.assertExpectedObjects(expectedResults, results, "approx-histo");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ public void testGroupByWithFixedHistogramAgg()
)
);

Iterable<ResultRow> results = runner.run(QueryPlus.wrap(query)).toList();
Iterable<ResultRow> results = runner.run(QueryPlus.wrap(GroupByQueryRunnerTestHelper.populateResourceId(query)))
.toList();
TestHelper.assertExpectedObjects(expectedResults, results, "fixed-histo");
}

Expand Down Expand Up @@ -233,7 +234,8 @@ public void testGroupByWithSameNameComplexPostAgg()
)
);

Iterable<ResultRow> results = runner.run(QueryPlus.wrap(query)).toList();
Iterable<ResultRow> results = runner.run(QueryPlus.wrap(GroupByQueryRunnerTestHelper.populateResourceId(query)))
.toList();
TestHelper.assertExpectedObjects(expectedResults, results, "fixed-histo");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ public static void retryUntil(

LOG.info(
"Attempt[%d/%d] did not pass: Task %s still not complete. Next retry in %d ms",
currentTry, retryCount, taskMessage, delayInMillis
currentTry,
retryCount,
taskMessage,
delayInMillis
);
try {
Thread.sleep(delayInMillis);
Expand All @@ -83,10 +86,10 @@ public static void retryUntil(
if (currentTry > retryCount) {
if (lastException != null) {
throw new ISE(
lastException,
"Max number of retries[%d] exceeded for Task[%s]. Failing.",
retryCount,
taskMessage,
lastException
taskMessage
);
} else {
throw new ISE(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,14 @@ public FluentQueryRunner<T> postProcess(PostProcessingOperator<T> postProcessing
return from(postProcessing != null ? postProcessing.postProcess(baseRunner) : baseRunner);
}

public FluentQueryRunner<T> mergeResults()
/**
* Delegates to {@link QueryToolChest#mergeResults(QueryRunner, boolean)}.
*
* @see QueryToolChest#mergeResults(QueryRunner, boolean)
*/
public FluentQueryRunner<T> mergeResults(boolean willMergeRunner)
{
return from(toolChest.mergeResults(baseRunner));
return from(toolChest.mergeResults(baseRunner, willMergeRunner));
}

public FluentQueryRunner<T> map(final Function<QueryRunner<T>, QueryRunner<T>> mapFn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,10 @@ public boolean isWindowingStrictValidation()
);
}

public QueryResourceId getQueryResourceId()
{
return new QueryResourceId(getString(QueryContexts.QUERY_RESOURCE_ID));
}

public String getBrokerServiceName()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ public class QueryContexts
public static final String UNCOVERED_INTERVALS_LIMIT_KEY = "uncoveredIntervalsLimit";
public static final String MIN_TOP_N_THRESHOLD = "minTopNThreshold";
public static final String WINDOWING_STRICT_VALIDATION = "windowingStrictValidation";

// Unique identifier for the query, that is used to map the global shared resources (specifically merge buffers) to the
// query's runtime
public static final String QUERY_RESOURCE_ID = "queryResourceId";

// SQL query context keys
public static final String CTX_SQL_QUERY_ID = BaseQuery.SQL_QUERY_ID;
Expand Down
Loading

0 comments on commit b9bbde5

Please sign in to comment.