Skip to content

Commit

Permalink
Limit the subquery results by memory usage (#13952)
Browse files Browse the repository at this point in the history
Users can now add a guardrail to prevent subquery’s results from exceeding the set number of bytes by setting druid.server.http.maxSubqueryRows in Broker's config or maxSubqueryRows in the query context. This feature is experimental for now and would default back to row-based limiting in case it fails to get the accurate size of the results consumed by the query.
  • Loading branch information
LakshSingla authored Jun 26, 2023
1 parent d7c9c2f commit 1647d5f
Show file tree
Hide file tree
Showing 52 changed files with 4,206 additions and 943 deletions.
14 changes: 14 additions & 0 deletions docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1840,6 +1840,19 @@ This strategy can be enabled by setting `druid.query.scheduler.laning.strategy=h
|--------|-----------|-------|
|`druid.query.scheduler.laning.maxLowPercent`|Maximum percent of the smaller number of `druid.server.http.numThreads` or `druid.query.scheduler.numThreads`, defining the number of HTTP threads that can be used by queries with a priority lower than 0. Value must be an integer in the range 1 to 100, and will be rounded up|No default, must be set if using this mode|

##### Guardrails for materialization of subqueries
Druid stores the subquery rows in temporary tables that live in the Java heap. It is a good practice to avoid large subqueries in Druid.
Therefore there are guardrails that are built in Druid to prevent the queries from generating subquery results which can exhaust the heap
space. They can be set on a cluster level or modified per query level as desired.
Note the following guardrails that can be set by the cluster admin to limit the subquery results:

1. `druid.server.http.maxSubqueryRows` in broker's config to set a default for the entire cluster or `maxSubqueryRows` in the query context to set an upper limit on the number of rows a subquery can generate
2. `druid.server.http.maxSubqueryBytes` in broker's config to set a default for the entire cluster or `maxSubqueryBytes` in the query context to set an upper limit on the number of bytes a subquery can generate

Note that limiting the subquery by bytes is a newer feature therefore it is experimental as it materializes the results differently.

If you choose to modify or set any of the above limits, you must also think about the heap size of all Brokers, Historicals, and task Peons that process data for the subqueries to accommodate the subquery results.
There is no formula to calculate the correct value. Trial and error is the best approach.

###### 'Manual' laning strategy
This laning strategy is best suited for cases where one or more external applications which query Druid are capable of manually deciding what lane a given query should belong to. Configured with a map of lane names to percent or exact max capacities, queries with a matching `lane` parameter in the [query context](../querying/query-context.md) will be subjected to those limits.
Expand All @@ -1862,6 +1875,7 @@ Druid uses Jetty to serve HTTP requests. Each query being processed consumes a s
|`druid.server.http.defaultQueryTimeout`|Query timeout in millis, beyond which unfinished queries will be cancelled|300000|
|`druid.server.http.maxScatterGatherBytes`|Maximum number of bytes gathered from data processes such as Historicals and realtime processes to execute a query. Queries that exceed this limit will fail. This is an advance configuration that allows to protect in case Broker is under heavy load and not utilizing the data gathered in memory fast enough and leading to OOMs. This limit can be further reduced at query time using `maxScatterGatherBytes` in the context. Note that having large limit is not necessarily bad if broker is never under heavy concurrent load in which case data gathered is processed quickly and freeing up the memory used. Human-readable format is supported, see [here](human-readable-byte.md). |Long.MAX_VALUE|
|`druid.server.http.maxSubqueryRows`|Maximum number of rows from all subqueries per query. Druid stores the subquery rows in temporary tables that live in the Java heap. `druid.server.http.maxSubqueryRows` is a guardrail to prevent the system from exhausting available heap. When a subquery exceeds the row limit, Druid throws a resource limit exceeded exception: "Subquery generated results beyond maximum."<br /><br />It is a good practice to avoid large subqueries in Druid. However, if you choose to raise the subquery row limit, you must also increase the heap size of all Brokers, Historicals, and task Peons that process data for the subqueries to accommodate the subquery results.<br /><br />There is no formula to calculate the correct value. Trial and error is the best approach.|100000|
|`druid.server.http.maxSubqueryBytes`|Maximum number of bytes from all subqueries per query. Since the results are stored on the Java heap, `druid.server.http.maxSubqueryBytes` is a guardrail like `druid.server.http.maxSubqueryRows` to prevent the heap space from exhausting. When a subquery exceeds the byte limit, Druid throws a resource limit exceeded exception. A negative value for the guardrail indicates that Druid won't guardrail by memory. Check the docs for `druid.server.http.maxSubqueryRows` to see how to set the optimal value for a cluster. This is an experimental feature for now as this materializes the results in a different format.|-1|
|`druid.server.http.gracefulShutdownTimeout`|The maximum amount of time Jetty waits after receiving shutdown signal. After this timeout the threads will be forcefully shutdown. This allows any queries that are executing to complete(Only values greater than zero are valid).|`PT30S`|
|`druid.server.http.unannouncePropagationDelay`|How long to wait for ZooKeeper unannouncements to propagate before shutting down Jetty. This is a minimum and `druid.server.http.gracefulShutdownTimeout` does not start counting down until after this period elapses.|`PT0S` (do not wait)|
|`druid.server.http.maxQueryTimeout`|Maximum allowed value (in milliseconds) for `timeout` parameter. See [query-context](../querying/query-context.md) to know more about `timeout`. Query is rejected if the query context `timeout` is greater than this value. |Long.MAX_VALUE|
Expand Down
9 changes: 7 additions & 2 deletions docs/querying/query-execution.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,15 @@ their "base" (bottom-leftmost) datasource, as described in the [join](#join) sec
the results are brought back to the Broker. Then, the Broker continues on with the rest of the query as if the subquery
was replaced with an inline datasource.

In most cases, Druid buffers subquery results in memory on the Broker before the rest of the query proceeds. Therefore, subqueries execute sequentially. The total number of rows buffered across all subqueries of a given query cannot exceed the [`druid.server.http.maxSubqueryRows`](../configuration/index.md) which defaults to 100000 rows. Otherwise Druid throws a resource limit exceeded exception: "Subquery generated results beyond maximum."
In most cases, Druid buffers subquery results in memory on the Broker before the rest of the query proceeds.
Therefore, subqueries execute sequentially. The total number of rows buffered across all subqueries of a given query
cannot exceed the [`druid.server.http.maxSubqueryRows`](../configuration/index.md) which defaults to 100000 rows, or the
[`druid.server.http.maxSubqueryBytes`](../configuration/index.md) if set. Otherwise, Druid throws a resource limit exceeded
exception.

There is one exception: if the outer query and all subqueries are the [groupBy](groupbyquery.md) type, then subquery
results can be processed in a streaming fashion and the `druid.server.http.maxSubqueryRows` limit does not apply.
results can be processed in a streaming fashion and the `druid.server.http.maxSubqueryRows` and `druid.server.http.maxSubqueryBytes`
limits do not apply.

### `join`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@
import org.apache.druid.msq.input.table.SegmentWithDescriptor;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.segment.InlineSegmentWrangler;
import org.apache.druid.segment.SegmentWrangler;
import org.apache.druid.timeline.SegmentId;

import java.util.function.Consumer;

/**
* Reads {@link InlineInputSlice} using {@link SegmentWrangler} (which is expected to contain an
* {@link org.apache.druid.segment.InlineSegmentWrangler}).
* {@link InlineSegmentWrangler}).
*/
public class InlineInputSliceReader implements InputSliceReader
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,13 +149,6 @@ public void testUnplannableQueries()

}

@Ignore
@Override
public void testMaxSubqueryRows()
{

}

@Ignore
@Override
public void testQueryWithMoreThanMaxNumericInFilter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ private FrameReader(
/**
* Create a reader for frames with a given {@link RowSignature}. The signature must exactly match the frames to be
* read, or else behavior is undefined.
* If the columnType is null, we store the data as {@link ColumnType#NESTED_DATA}. This can be done if we know that
* the data that we receive can be serded generically using the nested data. It is currently used in the brokers to
* store the data with unknown types into frames.
* @param signature signature used to generate the reader
*/
public static FrameReader create(final RowSignature signature)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,26 @@

package org.apache.druid.frame.segment;

import org.apache.druid.error.DruidException;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.write.FrameWriter;
import org.apache.druid.frame.write.FrameWriterFactory;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.filter.BoundDimFilter;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.filter.BoundFilter;
import org.apache.druid.segment.filter.Filters;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Iterator;
import java.util.NoSuchElementException;

public class FrameCursorUtils
{
Expand Down Expand Up @@ -67,4 +76,63 @@ public static Filter buildFilter(@Nullable Filter filter, Interval interval)
);
}
}

/**
* Writes a {@link Cursor} to a sequence of {@link Frame}. This method iterates over the rows of the cursor,
* and writes the columns to the frames
*
* @param cursor Cursor to write to the frame
* @param frameWriterFactory Frame writer factory to write to the frame.
* Determines the signature of the rows that are written to the frames
*/
public static Sequence<Frame> cursorToFrames(
Cursor cursor,
FrameWriterFactory frameWriterFactory
)
{

return Sequences.simple(
() -> new Iterator<Frame>()
{
@Override
public boolean hasNext()
{
return !cursor.isDone();
}

@Override
public Frame next()
{
// Makes sure that cursor contains some elements prior. This ensures if no row is written, then the row size
// is larger than the MemoryAllocators returned by the provided factory
if (!hasNext()) {
throw new NoSuchElementException();
}
boolean firstRowWritten = false;
Frame frame;
try (final FrameWriter frameWriter = frameWriterFactory.newFrameWriter(cursor.getColumnSelectorFactory())) {
while (!cursor.isDone()) {
if (!frameWriter.addSelection()) {
break;
}
firstRowWritten = true;
cursor.advance();
}

if (!firstRowWritten) {
throw DruidException
.forPersona(DruidException.Persona.DEVELOPER)
.ofCategory(DruidException.Category.CAPACITY_EXCEEDED)
.build("Subquery's row size exceeds the frame size and therefore cannot write the subquery's "
+ "row to the frame. This is a non-configurable static limit that can only be modified by the "
+ "developer.");
}

frame = Frame.wrap(frameWriter.toByteArray());
}
return frame;
}
}
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class FrameQueryableIndex implements QueryableIndex
private final RowSignature signature;
private final List<FrameColumnReader> columnReaders;

FrameQueryableIndex(
public FrameQueryableIndex(
final Frame frame,
final RowSignature signature,
final List<FrameColumnReader> columnReaders
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,4 +291,15 @@ private static boolean areKeyColumnsPrefixOfSignature(

return true;
}

public static RowSignature replaceUnknownTypesWithNestedColumns(final RowSignature rowSignature)
{
RowSignature.Builder retBuilder = RowSignature.builder();
for (int i = 0; i < rowSignature.size(); ++i) {
String columnName = rowSignature.getColumnName(i);
ColumnType columnType = rowSignature.getColumnType(i).orElse(ColumnType.NESTED_DATA);
retBuilder.add(columnName, columnType);
}
return retBuilder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.query.FrameBasedInlineDataSource;
import org.apache.druid.query.FrameBasedInlineDataSourceSerializer;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.context.ResponseContextDeserializer;
import org.joda.time.DateTimeZone;
Expand All @@ -52,6 +54,8 @@ public DruidDefaultSerializersModule()

JodaStuff.register(this);

addSerializer(FrameBasedInlineDataSource.class, new FrameBasedInlineDataSourceSerializer());

addDeserializer(
DateTimeZone.class,
new JsonDeserializer<DateTimeZone>()
Expand Down
Loading

0 comments on commit 1647d5f

Please sign in to comment.