-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Limit the subquery results by memory usage #13952
Conversation
server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java
Fixed
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partial review!
processing/src/main/java/org/apache/druid/query/InlineDataSource.java
Outdated
Show resolved
Hide resolved
processing/src/main/java/org/apache/druid/query/InlineDataSource.java
Outdated
Show resolved
Hide resolved
processing/src/main/java/org/apache/druid/query/InlineDataSource.java
Outdated
Show resolved
Hide resolved
@@ -300,7 +363,7 @@ public boolean equals(Object o) | |||
return false; | |||
} | |||
InlineDataSource that = (InlineDataSource) o; | |||
return rowsEqual(rows, that.rows) && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe compare frames directly if possible and save on the cost of initializing various frameReaders and then de serializing them to rows.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll check if this works since that would be faster.
InlineDatasource's equality is done only in the tests afaik, therefore we should be fine if it doesn't work as such.
@@ -565,10 +616,14 @@ private static <T, QueryType extends Query<T>> InlineDataSource toInlineDataSour | |||
final Sequence<T> results, | |||
final QueryToolChest<T, QueryType> toolChest, | |||
final AtomicInteger limitAccumulator, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should never be using both limits I guess. Can we remove the additional 3 params and just pass another param called type ? and re-use the same limit variables ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure of the use case. Theoretically, we can pass both the limits and error the query out if any one of them is reached. Is that the behavior we can encourage or do we want the user to give only one of the limit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we have two limits, we might want to give them names so we know which limit that limitAccumulator
is accumulating: row count or memory bytes?
Frame frame = null; | ||
|
||
// Try to serialize the results into a frame only if the memory limit is set on the server or the query | ||
if (memoryLimitSet) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should only serialize the frames only when memoryLimit Is set else the old code path should be invoked.
|
||
|
||
@Test | ||
public void testTimeseriesOnGroupByOnTableErrorTooLarge() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think tests around
- String cols
- Long cols
- Double/float cols
- Complex cols
- Array cols
- Nested cols
Would help us build confidence in the feature.
Some tests to checkout:
CalciteQueryTests#testMaxSubqueryRows
GroupByQueryRunnerTest#testGroupByMaxRowsLimitContextOverride
Ideally all tests which have a subquery should be executed using the new code path but since its feature flagged it might not be a hard requirement.
processing/src/main/java/org/apache/druid/query/InlineDataSource.java
Outdated
Show resolved
Hide resolved
processing/src/main/java/org/apache/druid/query/InlineDataSource.java
Outdated
Show resolved
Hide resolved
@@ -300,7 +363,7 @@ public boolean equals(Object o) | |||
return false; | |||
} | |||
InlineDataSource that = (InlineDataSource) o; | |||
return rowsEqual(rows, that.rows) && | |||
return rowsEqual(getRowsAsList(), that.getRowsAsList()) && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it even make semantic sense to compare two input sources for equality? Are we adding this become some static check told us we need it, but not because we actually use it?
@@ -565,10 +616,14 @@ private static <T, QueryType extends Query<T>> InlineDataSource toInlineDataSour | |||
final Sequence<T> results, | |||
final QueryToolChest<T, QueryType> toolChest, | |||
final AtomicInteger limitAccumulator, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we have two limits, we might want to give them names so we know which limit that limitAccumulator
is accumulating: row count or memory bytes?
server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java
Outdated
Show resolved
Hide resolved
new ArrayList<>() | ||
); | ||
|
||
final Cursor cursor = new InlineResultsCursor(resultList, signature); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All of this seems too complex to put inside the segment walker. For one thing, it is hard to test if it is an implementation detail. Perhaps pull out this logic into a separate class that can be unit tested extensively. For example, we'd want tests for hitting each of the limits, for handling variable-width columns, etc.
server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java
Fixed
Show resolved
Hide resolved
2. Add the resultAsFrames to other toolchests 3. Refactoring 4. Handling null types
server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java
Fixed
Show fixed
Hide fixed
server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java
Fixed
Show fixed
Hide fixed
2. Checkstyle 3. Precheck in the toolchest 4. Conversion to iterables for the frames
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments.
Looking forward to the UT's.
processing/src/main/java/org/apache/druid/frame/field/FieldWriters.java
Outdated
Show resolved
Hide resolved
processing/src/main/java/org/apache/druid/frame/field/FieldWriters.java
Outdated
Show resolved
Hide resolved
processing/src/main/java/org/apache/druid/frame/read/FrameReader.java
Outdated
Show resolved
Hide resolved
default: | ||
throw new ISE("Unrecognized frame type [%s]", frameType); | ||
} | ||
} | ||
|
||
public static FrameWriterFactory makeFrameWriterFactory( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method seems weird. You can call the base method directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added this as a separate method originally because we don't require the boolean in the rest of the cases (i.e. the original ones in MSQ). Therefore it made sense to me to hide this complexity from the callers of this method that arent residing in the broker.
@@ -52,6 +54,8 @@ public DruidDefaultSerializersModule() | |||
|
|||
JodaStuff.register(this); | |||
|
|||
addSerializer(FramesBackedInlineDataSource.class, new FramesBackedInlineDataSourceSerializer()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you let us know the reason why do you think adding the serializer here makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would think we don't need to serialize this, as it should exist in-memory only. So I'm also wondering where this was needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This serialization is required when the broker inlines the subquery results and sends the inlined query to the historicals. In that case, we serialize the Frames and FrameBasedInlineDatasource to behave as if an equivalent InlineDatasource (based on rows) would have been serialized.
frame = Frame.wrap(frameWriter.toByteArray()); | ||
} | ||
|
||
return new FrameSignaturePair(frame, result.getRowSignature()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are making one frame per result sequence and each result sequence represents one segment. This does not seem very scalable.
Lets leave a note here so that we can get back to this in a future PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we can have different row sigs per segment, what we can do is only start a new frame when the row signature is different. This will reduce the number of frames by a lot.
processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
Show resolved
Hide resolved
processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java
Outdated
Show resolved
Hide resolved
processing/src/main/java/org/apache/druid/query/QueryToolChest.java
Outdated
Show resolved
Hide resolved
refactor add unit tests for serdes, and helper classes
… semantics of creating the frames, config changes
processing/src/main/java/org/apache/druid/frame/segment/FrameCursorUtils.java
Fixed
Show fixed
Hide fixed
processing/src/main/java/org/apache/druid/segment/join/table/FrameBasedIndexedTable.java
Fixed
Show fixed
Hide fixed
processing/src/test/java/org/apache/druid/segment/join/table/FrameBasedIndexedTableTest.java
Fixed
Show fixed
Hide fixed
processing/src/test/java/org/apache/druid/segment/join/table/FrameBasedIndexedTableTest.java
Fixed
Show fixed
Hide fixed
processing/src/main/java/org/apache/druid/segment/join/table/FrameBasedIndexedTable.java
Fixed
Show fixed
Hide fixed
Thanks for the reviews!
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes LGTM.
Thanks @LakshSingla !!
{ | ||
public enum SubqueryResultLimit | ||
{ | ||
ROW_LIMIT, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add java docs here.
limitAccumulator.addAndGet(frame.getFrame().numRows()); | ||
if (memoryLimitAccumulator.addAndGet(frame.getFrame().numBytes()) >= memoryLimit) { | ||
throw ResourceLimitExceededException.withMessage( | ||
"Subquery generated results beyond maximum[%d] bytes", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method needs java docs. Line 730 is eating up exceptions to fall back. Lets document this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whats the expected action from user?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updating with a more appropriate error message
); | ||
} | ||
catch (Exception e) { | ||
return Optional.empty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add some debug line so that we know the exception here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this a debug log though? It should be WARN.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be executed per query so DEBUG is more appropriate according to me, otherwise, the logs will be cluttered with the exception message. Either we should:
- Keep it as DEBUG info so that we don't have to see cluttered logs. This has the disadvantage that we won't be able to readily observe if we fallback to the default method/code
- Don't catch the exception and let it propagate. The user will then report the issue and we can fix it.
2nd option means that there won't be a fallback in case we aren't able to convert it to frames. Since this is a newer feature, I think we should still have a fallback till we are confident that we can convert each query, and once it is more mature and the frames can handle array types (currently it can handle string arrays only), we can remove this fallback altogether and let the exception pass through.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. We shouldn't be doing 2nd option. We can do something like below
- log the thing as Info.
- log the exception stack trace if debug is set in the query context.
} | ||
|
||
if (!firstRowWritten) { | ||
throw new ISE("Row size is greater than the frame size."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will a user ever see this error message? Please use the DruidException
class instead and add an error message thats more actionable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes this can be seen at the top level. Refactored with the DruidException and a more actionable error message
); | ||
} | ||
catch (Exception e) { | ||
return Optional.empty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this a debug log though? It should be WARN.
limitAccumulator.addAndGet(frame.getFrame().numRows()); | ||
if (memoryLimitAccumulator.addAndGet(frame.getFrame().numBytes()) >= memoryLimit) { | ||
throw ResourceLimitExceededException.withMessage( | ||
"Subquery generated results beyond maximum[%d] bytes", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whats the expected action from user?
Since this PR is liable to break due to merge conflicts, going ahead and merging this. |
Thanks for the contribution @LakshSingla !! |
Users can now add a guardrail to prevent subquery’s results from exceeding the set number of bytes by setting druid.server.http.maxSubqueryRows in Broker's config or maxSubqueryRows in the query context. This feature is experimental for now and would default back to row-based limiting in case it fails to get the accurate size of the results consumed by the query.
Description
Overview
Currently, in the ClientQuerySegmentWalker, when the data sources get inlined, they can be limited by the number of rows to prevent a query (subquery) from hogging up the broker's memory. This however doesn't have a proper correspondence with the memory used, since a row can have multiple columns with varying amounts of data in them. Therefore it would be better if a memory limit is also available, which prevents the subquery's results from exploding beyond a certain memory limit.
This PR aims to use the Frame which was introduced along with the MSQ to store the inline results. Since the Frames are backed by memory, we can fetch the memory used by the frame, and correspondingly the data source to estimate the size that is consumed by the inline data source. This is a close estimate of the size taken by the subquery results.
Configuration
maxSubqueryBytes
key in the query context with the value that is the upper bound on the number of bytes the subquery's results can take.useNestedForUnknownTypeInSubquery
parameter that by default is set to false but can be set to true, which controls how to handle the column's whose types are unknown (i.e. the columnType is empty/null). If the parameter is set, then the null types are serded as nested JSON data, which should handle most of the common cases.Behaviour
As proposed in the review comment if
maxSubqueryBytes
is not setmaxSubqueryRows
to limit the results of the subquery by the number of rows. Also, we execute the older code path, which doesn't materialize the results to framesmaxSubqueryBytes
is setProposed changes
Supporting changes
FrameBasedIndexedTable
- An indexed table that works on theFrameBasedInlineDataSource
. It indexes the key columns and provides a way to extract the columnReader for the columns of the data sourceFrameBasedInlineDataSource
- Inline data source which is based upon an underlying list of frames. Frames can be written using individual row signatures. The datasource itself has its own signature which is "cumulative" signature of the underlying frames.IterableRowsCursorHelper
- Creates a cursor from an iterable representing the rows of a datasource.ConcatCursor
- Cursor representing concatenation of multiple underlying cursors.FrameBasedInlineDataSourceSerializer
- Serializes aFrameBasedInlineDataSource
as if it was the traditional InlineDataSource. This is done to keep the communication protocol between the broker and the data servers (historicals) unchanged.Testing
Impact on existing deployments
ScanResultValue
shouldn't affect the upgrade process since historicals are updated before the brokers in the proper upgrade method, therefore brokers can always assume that theScanResultValue
should contain the updated information (of signature). Broker's currently don't break even if the fetchedScanResultvalue
has rowSignature as null, therefore existing deployments shouldn't be affectedFollow up
Release note
Users can now add a guardrail to prevent subquery’s results from exceeding the set number of bytes by setting
druid.server.http.maxSubqueryRows
in Broker's config ormaxSubqueryRows
in the query context. This feature is experimental for now and would default back to row-based limiting in case it fails to get the accurate size of the results consumed by the query.This PR has: