Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit the subquery results by memory usage #13952

Merged
merged 56 commits into from
Jun 26, 2023
Merged

Conversation

LakshSingla
Copy link
Contributor

@LakshSingla LakshSingla commented Mar 20, 2023

Description

Overview

Currently, in the ClientQuerySegmentWalker, when the data sources get inlined, they can be limited by the number of rows to prevent a query (subquery) from hogging up the broker's memory. This however doesn't have a proper correspondence with the memory used, since a row can have multiple columns with varying amounts of data in them. Therefore it would be better if a memory limit is also available, which prevents the subquery's results from exploding beyond a certain memory limit.

This PR aims to use the Frame which was introduced along with the MSQ to store the inline results. Since the Frames are backed by memory, we can fetch the memory used by the frame, and correspondingly the data source to estimate the size that is consumed by the inline data source. This is a close estimate of the size taken by the subquery results.

Configuration

  1. User can set the maxSubqueryBytes key in the query context with the value that is the upper bound on the number of bytes the subquery's results can take.
  2. Often, the result types of the query's results are unknown to the broker. Materializing to frames require that the types be present, which can cause results to not be materialized. Therefore there is an additional undocumented parameter useNestedForUnknownTypeInSubquery parameter that by default is set to false but can be set to true, which controls how to handle the column's whose types are unknown (i.e. the columnType is empty/null). If the parameter is set, then the null types are serded as nested JSON data, which should handle most of the common cases.

Behaviour

As proposed in the review comment if

  • if maxSubqueryBytes is not set
    • We use maxSubqueryRows to limit the results of the subquery by the number of rows. Also, we execute the older code path, which doesn't materialize the results to frames
  • if maxSubqueryBytes is set
    • if we can materialize the results of all the subqueries to frames
      • We limit the results of the subquery by the number of bytes
    • if we can't materialize the results of all the subqueries to frames
      • We default to the old code path and limit the results of the subquery by the number of rows (if set).

Proposed changes

  1. ClientQuerySegmentWalker#toInlineDataSource now has distinct code paths which convert the results respectively to frames (new changes) or the iterable of rows (old changes)
  2. The query tool chests have been updated to materialize the results as frames if possible.

Supporting changes

  1. FrameBasedIndexedTable - An indexed table that works on the FrameBasedInlineDataSource. It indexes the key columns and provides a way to extract the columnReader for the columns of the data source
  2. FrameBasedInlineDataSource - Inline data source which is based upon an underlying list of frames. Frames can be written using individual row signatures. The datasource itself has its own signature which is "cumulative" signature of the underlying frames.
  3. IterableRowsCursorHelper - Creates a cursor from an iterable representing the rows of a datasource.
  4. ConcatCursor - Cursor representing concatenation of multiple underlying cursors.
  5. ScanResultValue has been updated to provide the row signature whenever present in the underlying datasources.
  6. FrameBasedInlineDataSourceSerializer - Serializes a FrameBasedInlineDataSource as if it was the traditional InlineDataSource. This is done to keep the communication protocol between the broker and the data servers (historicals) unchanged.

Testing

  • The set of changes has been tested on the existing CalciteQueryTests stack.
  • The changes have been tested on a local deployment

Impact on existing deployments

  1. This change shouldn't affect the existing deployments. The communication protocol between the broker and the historical is unchanged (the frame-based datasources, inline as existing traditional datasources).
  2. The change to ScanResultValue shouldn't affect the upgrade process since historicals are updated before the brokers in the proper upgrade method, therefore brokers can always assume that the ScanResultValue should contain the updated information (of signature). Broker's currently don't break even if the fetched ScanResultvalue has rowSignature as null, therefore existing deployments shouldn't be affected
  3. The new code paths are only deployed when the memory limit is mentioned by the user in the queries, therefore existing queries should continue to work as is, without any performance impacts.

Follow up

  1. Create an automatic user-friendly configuration based on the guidelines.

Release note

Users can now add a guardrail to prevent subquery’s results from exceeding the set number of bytes by setting druid.server.http.maxSubqueryRows in Broker's config or maxSubqueryRows in the query context. This feature is experimental for now and would default back to row-based limiting in case it fails to get the accurate size of the results consumed by the query.


This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

Copy link
Contributor

@cryptoe cryptoe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partial review!

@@ -300,7 +363,7 @@ public boolean equals(Object o)
return false;
}
InlineDataSource that = (InlineDataSource) o;
return rowsEqual(rows, that.rows) &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe compare frames directly if possible and save on the cost of initializing various frameReaders and then de serializing them to rows.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll check if this works since that would be faster.
InlineDatasource's equality is done only in the tests afaik, therefore we should be fine if it doesn't work as such.

@@ -565,10 +616,14 @@ private static <T, QueryType extends Query<T>> InlineDataSource toInlineDataSour
final Sequence<T> results,
final QueryToolChest<T, QueryType> toolChest,
final AtomicInteger limitAccumulator,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should never be using both limits I guess. Can we remove the additional 3 params and just pass another param called type ? and re-use the same limit variables ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure of the use case. Theoretically, we can pass both the limits and error the query out if any one of them is reached. Is that the behavior we can encourage or do we want the user to give only one of the limit?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have two limits, we might want to give them names so we know which limit that limitAccumulator is accumulating: row count or memory bytes?

Frame frame = null;

// Try to serialize the results into a frame only if the memory limit is set on the server or the query
if (memoryLimitSet) {
Copy link
Contributor

@cryptoe cryptoe Mar 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should only serialize the frames only when memoryLimit Is set else the old code path should be invoked.



@Test
public void testTimeseriesOnGroupByOnTableErrorTooLarge()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think tests around

  1. String cols
  2. Long cols
  3. Double/float cols
  4. Complex cols
  5. Array cols
  6. Nested cols
    Would help us build confidence in the feature.

Some tests to checkout:

  • CalciteQueryTests#testMaxSubqueryRows
  • GroupByQueryRunnerTest#testGroupByMaxRowsLimitContextOverride

Ideally all tests which have a subquery should be executed using the new code path but since its feature flagged it might not be a hard requirement.

@@ -300,7 +363,7 @@ public boolean equals(Object o)
return false;
}
InlineDataSource that = (InlineDataSource) o;
return rowsEqual(rows, that.rows) &&
return rowsEqual(getRowsAsList(), that.getRowsAsList()) &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it even make semantic sense to compare two input sources for equality? Are we adding this become some static check told us we need it, but not because we actually use it?

@@ -565,10 +616,14 @@ private static <T, QueryType extends Query<T>> InlineDataSource toInlineDataSour
final Sequence<T> results,
final QueryToolChest<T, QueryType> toolChest,
final AtomicInteger limitAccumulator,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have two limits, we might want to give them names so we know which limit that limitAccumulator is accumulating: row count or memory bytes?

new ArrayList<>()
);

final Cursor cursor = new InlineResultsCursor(resultList, signature);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of this seems too complex to put inside the segment walker. For one thing, it is hard to test if it is an implementation detail. Perhaps pull out this logic into a separate class that can be unit tested extensively. For example, we'd want tests for hitting each of the limits, for handling variable-width columns, etc.

Copy link
Contributor

@cryptoe cryptoe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments.
Looking forward to the UT's.

default:
throw new ISE("Unrecognized frame type [%s]", frameType);
}
}

public static FrameWriterFactory makeFrameWriterFactory(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method seems weird. You can call the base method directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this as a separate method originally because we don't require the boolean in the rest of the cases (i.e. the original ones in MSQ). Therefore it made sense to me to hide this complexity from the callers of this method that arent residing in the broker.

@@ -52,6 +54,8 @@ public DruidDefaultSerializersModule()

JodaStuff.register(this);

addSerializer(FramesBackedInlineDataSource.class, new FramesBackedInlineDataSourceSerializer());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you let us know the reason why do you think adding the serializer here makes sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would think we don't need to serialize this, as it should exist in-memory only. So I'm also wondering where this was needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This serialization is required when the broker inlines the subquery results and sends the inlined query to the historicals. In that case, we serialize the Frames and FrameBasedInlineDatasource to behave as if an equivalent InlineDatasource (based on rows) would have been serialized.

frame = Frame.wrap(frameWriter.toByteArray());
}

return new FrameSignaturePair(frame, result.getRowSignature());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are making one frame per result sequence and each result sequence represents one segment. This does not seem very scalable.
Lets leave a note here so that we can get back to this in a future PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we can have different row sigs per segment, what we can do is only start a new frame when the row signature is different. This will reduce the number of frames by a lot.

@LakshSingla
Copy link
Contributor Author

Thanks for the reviews!
Post the latest batch of changes, here are the major things that I have updated:

  1. Create a FrameBasedIndexedTable to create an indexed table based on Frame datasource. This also changed the Frame type to COLUMNAR since to index the table we need to get COLUMNs corresponding to the indexed keys.
  2. Use a memory allocator factory to allow flexibility in converting the inline results to the frames
  3. Added configurations, need to provide the fallback code paths as well.
  4. Refactored the code and stylistic changes

Copy link
Contributor

@cryptoe cryptoe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes LGTM.
Thanks @LakshSingla !!

{
public enum SubqueryResultLimit
{
ROW_LIMIT,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add java docs here.

limitAccumulator.addAndGet(frame.getFrame().numRows());
if (memoryLimitAccumulator.addAndGet(frame.getFrame().numBytes()) >= memoryLimit) {
throw ResourceLimitExceededException.withMessage(
"Subquery generated results beyond maximum[%d] bytes",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method needs java docs. Line 730 is eating up exceptions to fall back. Lets document this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whats the expected action from user?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updating with a more appropriate error message

);
}
catch (Exception e) {
return Optional.empty();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add some debug line so that we know the exception here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this a debug log though? It should be WARN.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be executed per query so DEBUG is more appropriate according to me, otherwise, the logs will be cluttered with the exception message. Either we should:

  1. Keep it as DEBUG info so that we don't have to see cluttered logs. This has the disadvantage that we won't be able to readily observe if we fallback to the default method/code
  2. Don't catch the exception and let it propagate. The user will then report the issue and we can fix it.

2nd option means that there won't be a fallback in case we aren't able to convert it to frames. Since this is a newer feature, I think we should still have a fallback till we are confident that we can convert each query, and once it is more mature and the frames can handle array types (currently it can handle string arrays only), we can remove this fallback altogether and let the exception pass through.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. We shouldn't be doing 2nd option. We can do something like below

  • log the thing as Info.
  • log the exception stack trace if debug is set in the query context.

}

if (!firstRowWritten) {
throw new ISE("Row size is greater than the frame size.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will a user ever see this error message? Please use the DruidException class instead and add an error message thats more actionable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this can be seen at the top level. Refactored with the DruidException and a more actionable error message

);
}
catch (Exception e) {
return Optional.empty();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this a debug log though? It should be WARN.

limitAccumulator.addAndGet(frame.getFrame().numRows());
if (memoryLimitAccumulator.addAndGet(frame.getFrame().numBytes()) >= memoryLimit) {
throw ResourceLimitExceededException.withMessage(
"Subquery generated results beyond maximum[%d] bytes",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whats the expected action from user?

@cryptoe
Copy link
Contributor

cryptoe commented Jun 26, 2023

Since this PR is liable to break due to merge conflicts, going ahead and merging this.
@LakshSingla Please address the logging feedback from @abhishekagarwal87 as part of a separate PR.

@cryptoe cryptoe merged commit 1647d5f into apache:master Jun 26, 2023
@cryptoe
Copy link
Contributor

cryptoe commented Jun 26, 2023

Thanks for the contribution @LakshSingla !!

@abhishekagarwal87 abhishekagarwal87 added this to the 27.0 milestone Jul 19, 2023
sergioferragut pushed a commit to sergioferragut/druid that referenced this pull request Jul 21, 2023
Users can now add a guardrail to prevent subquery’s results from exceeding the set number of bytes by setting druid.server.http.maxSubqueryRows in Broker's config or maxSubqueryRows in the query context. This feature is experimental for now and would default back to row-based limiting in case it fails to get the accurate size of the results consumed by the query.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants