Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit the subquery results by memory usage #13952

Merged
merged 56 commits into from
Jun 26, 2023
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
3e4935a
Initial commit
LakshSingla Jan 11, 2022
04bb5b8
Merge branch 'master' into broker-mem
LakshSingla Feb 6, 2023
e7f0f9f
initial commit, build inline data source using frames
LakshSingla Mar 17, 2023
b96a671
Merge branch 'master' into broker-mem
LakshSingla Mar 17, 2023
6b119a3
convert rows to frames
LakshSingla Mar 17, 2023
36e1f37
fallback to row based inline data source if cannot convert to frames
LakshSingla Mar 20, 2023
633f83a
spurious copy paste fix
LakshSingla Mar 20, 2023
9cc62d2
revert pom.xml changes
LakshSingla Mar 20, 2023
f3db103
cleanup, feature gate the new config
LakshSingla Mar 20, 2023
038b3b5
add frame generation logic to query and scan toolchests
LakshSingla Mar 22, 2023
96ad351
add IterableBackedInlineDataSource
LakshSingla Mar 22, 2023
74f06cd
add ScanQueryQueryToolChest
LakshSingla Mar 22, 2023
2866de3
1. Rename InlineDataSource to IterableBackedInlineDataSource
LakshSingla Mar 28, 2023
eda4c42
1. Custom serde for frames backed datasource
LakshSingla Apr 3, 2023
8a07da1
remove debugging short circuit
LakshSingla Apr 3, 2023
fd32127
add comments, remove unused classes
LakshSingla Apr 4, 2023
ea56f4f
better comments
LakshSingla Apr 4, 2023
15b49b0
add test for IterableRowsCursor
LakshSingla Apr 5, 2023
8e44ec6
review comments
LakshSingla Apr 6, 2023
b834f8e
add joinable factory, update test cases, remove from the server config
LakshSingla Apr 10, 2023
42d0698
add tests, address review comments
LakshSingla Apr 12, 2023
9bc6b3a
Merge branch 'master' into broker-mem
LakshSingla Apr 12, 2023
fe5d203
refactor
LakshSingla Apr 12, 2023
ac5cc02
fix changes due to upstream
LakshSingla Apr 12, 2023
37e217b
add a concat cursor
LakshSingla Apr 12, 2023
1f17557
fix ClientQuerySegmentWalkerTest
LakshSingla Apr 12, 2023
7e2a052
fix deprecated calls
LakshSingla Apr 12, 2023
f4b87d1
fix static checks, add code coverage
LakshSingla Apr 12, 2023
38a28c0
fix static checks, add code coverage
LakshSingla Apr 12, 2023
0db119a
Trigger Build
LakshSingla Apr 12, 2023
663a62d
query context change in the test case
LakshSingla Apr 14, 2023
bf39600
rename back IterableBackedInlineDataSource to InlineDataSource
LakshSingla Apr 16, 2023
bdab41a
Merge branch 'master' into broker-mem
LakshSingla Apr 16, 2023
1ec37af
review comments, add test cases, fix batching logic
LakshSingla Apr 19, 2023
6acf97a
use peekingiterator, review comments
LakshSingla Apr 24, 2023
4531e4d
add tests, fix edge cases
LakshSingla Apr 27, 2023
50cda6a
build rerun, fix the codeql checks
LakshSingla Apr 28, 2023
7284729
Frame based index table implementation
LakshSingla May 23, 2023
cc99705
review comments
LakshSingla May 23, 2023
7243ce3
review comments - checkstyle, add tests for the indexed table, change…
LakshSingla May 24, 2023
d432dfe
codeql, static checks
LakshSingla May 24, 2023
4e7a540
refactor, test fix
LakshSingla May 29, 2023
27a8566
static check
LakshSingla May 30, 2023
6a45ea9
add server config, refactor the code
LakshSingla May 31, 2023
8b3ab89
test fix
LakshSingla May 31, 2023
1d110b8
Merge branch 'master' into broker-mem
LakshSingla Jun 2, 2023
bc346c9
update server config for testing
LakshSingla Jun 2, 2023
235fdb3
build fix
LakshSingla Jun 2, 2023
755a41d
ignore a failing test, refactors and logs
LakshSingla Jun 19, 2023
0f60d97
static check
LakshSingla Jun 19, 2023
0feebee
static check
LakshSingla Jun 21, 2023
7f62179
Merge branch 'master' into broker-mem
LakshSingla Jun 21, 2023
97a8197
build fix
LakshSingla Jun 21, 2023
92c3b52
docs
LakshSingla Jun 26, 2023
4d7b3c4
spelling and review comments
LakshSingla Jun 26, 2023
dd3e140
limit frame size
LakshSingla Jun 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

/**
* Reads {@link InlineInputSlice} using {@link SegmentWrangler} (which is expected to contain an
* {@link org.apache.druid.segment.InlineSegmentWrangler}).
* {@link org.apache.druid.segment.IterableBasedInlineSegmentWrangler}).
*/
public class InlineInputSliceReader implements InputSliceReader
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,6 @@ public void testUnplannableQueries()

}

@Ignore
@Override
public void testMaxSubqueryRows()
{

}

@Ignore
@Override
public void testQueryWithMoreThanMaxNumericInFilter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.nested.NestedDataComplexTypeSerde;
import org.apache.druid.segment.serde.ComplexMetricSerde;
import org.apache.druid.segment.serde.ComplexMetrics;

Expand Down Expand Up @@ -57,10 +58,15 @@ private FieldWriters()
public static FieldWriter create(
final ColumnSelectorFactory columnSelectorFactory,
final String columnName,
final ColumnType columnType
final ColumnType columnType,
final boolean allowNullColumnType
LakshSingla marked this conversation as resolved.
Show resolved Hide resolved
)
{
if (columnType == null) {
// Returning Complex<Json> writer since we do not know the type of column.
if (allowNullColumnType) {
return makeComplexWriter(columnSelectorFactory, columnName, NestedDataComplexTypeSerde.TYPE_NAME);
LakshSingla marked this conversation as resolved.
Show resolved Hide resolved
}
throw new UnsupportedColumnTypeException(columnName, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,14 @@ private FrameReader(
/**
* Create a reader for frames with a given {@link RowSignature}. The signature must exactly match the frames to be
* read, or else behavior is undefined.
* If the columnType is null, we store the data as {@link ColumnType#NESTED_DATA}. This can be done if we know that
* the data that we receive can be serded generically using the nested data. It is currently used in the brokers to
* store the data with unknown types into frames.
* @param signature signature used to generate the reader
* @param allowNullTypes if ColumnTypes can be null. The column types that are null would be interpreted
* as Complex JSON types
*/
public static FrameReader create(final RowSignature signature)
public static FrameReader create(final RowSignature signature, final boolean allowNullTypes)
{
// Double-check that the frame does not have any disallowed field names. Generally, we expect this to be
// caught on the write side, but we do it again here for safety.
Expand All @@ -85,12 +91,17 @@ public static FrameReader create(final RowSignature signature)
final List<FieldReader> fieldReaders = new ArrayList<>(signature.size());

for (int columnNumber = 0; columnNumber < signature.size(); columnNumber++) {
final ColumnType columnType =
Preconditions.checkNotNull(
signature.getColumnType(columnNumber).orElse(null),
"Type for column [%s]",
signature.getColumnName(columnNumber)
);
ColumnType columnType;
if (!allowNullTypes) {
LakshSingla marked this conversation as resolved.
Show resolved Hide resolved
columnType =
Preconditions.checkNotNull(
signature.getColumnType(columnNumber).orElse(null),
"Type for column [%s]",
signature.getColumnName(columnNumber)
);
} else {
columnType = signature.getColumnType(columnNumber).orElse(ColumnType.NESTED_DATA);
LakshSingla marked this conversation as resolved.
Show resolved Hide resolved
}

columnReaders.add(FrameColumnReaders.create(columnNumber, columnType));
fieldReaders.add(FieldReaders.create(signature.getColumnName(columnNumber), columnType));
Expand All @@ -99,6 +110,11 @@ public static FrameReader create(final RowSignature signature)
return new FrameReader(signature, columnReaders, fieldReaders);
}

public static FrameReader create(final RowSignature signature)
{
return create(signature, false);
}

public RowSignature signature()
{
return signature;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,18 @@

package org.apache.druid.frame.segment;

import org.apache.druid.frame.Frame;
import org.apache.druid.frame.write.FrameWriter;
import org.apache.druid.frame.write.FrameWriterFactory;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.query.filter.BoundDimFilter;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.filter.BoundFilter;
import org.apache.druid.segment.filter.Filters;
Expand Down Expand Up @@ -67,4 +75,54 @@ public static Filter buildFilter(@Nullable Filter filter, Interval interval)
);
}
}

/**
* Writes a {@link Cursor} to a {@link Frame}. This method iterates over the rows of the cursor, and writes the columns
* to the cursor
* @param cursor Cursor to write to the frame
* @param frameWriterFactory Frame writer factory to write to the frame.
* Determines the signature of the rows that are written to the frames
* @param memoryLimitBytes Limit in bytes, if needs to be enforced while converting the cursor to the frame. If adding
* a row causes the frame size to exceed this limit, we throw an {@link ResourceLimitExceededException}
*/
public static Frame cursorToFrame(
Cursor cursor,
FrameWriterFactory frameWriterFactory,
@Nullable Long memoryLimitBytes
)
{
Frame frame;

try (final FrameWriter frameWriter = frameWriterFactory.newFrameWriter(cursor.getColumnSelectorFactory())) {
while (!cursor.isDone()) {
if (!frameWriter.addSelection()) {
throw new ISE(
"Unable to append row to the frame. Allocator capacity: [%s]",
frameWriterFactory.allocatorCapacity()
);
}

if (memoryLimitBytes != null && memoryLimitBytes < frameWriter.getTotalSize()) {
throw new ResourceLimitExceededException(
StringUtils.format(
"Exceeded total bytes allocated for this subquery. "
+ "Current size [%d], total row count [%d], allocated size [%d]. "
+ "Please limit the amount of data that the results occupy, or increase the limit using the context "
+ "parameter [%s]",
frameWriter.getTotalSize(),
frameWriter.getNumRows(),
memoryLimitBytes,
QueryContexts.MAX_SUBQUERY_MEMORY_BYTES_KEY
)
);
}

cursor.advance();
}

frame = Frame.wrap(frameWriter.toByteArray());
}

return frame;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;

import javax.annotation.Nonnull;
import java.util.List;
import java.util.Optional;
import java.util.Set;
Expand All @@ -53,24 +54,38 @@ private FrameWriters()
* @param signature signature of the frames
* @param sortColumns sort columns for the frames. If nonempty, {@link FrameSort#sort} is used to sort the
* resulting frames.
* @param allowNullColumnTypes to allow null ColumnType in the signature. This should only be enabled when the user
* knows that the column objects exist as native Java POJOs (LinkedList, Maps etc), which
* can be serded using the Druid's nested columns
*/
public static FrameWriterFactory makeFrameWriterFactory(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO adding allowNullColumnTypes as a parameter here isn't the right approach. Better for the caller to replace all the unknown types (null) in signature with COMPLEX<json>, if that's what's desired. It keeps the frame writer code simpler, easier to test/understand, etc.

final FrameType frameType,
final MemoryAllocatorFactory allocatorFactory,
final RowSignature signature,
final List<KeyColumn> sortColumns
@Nonnull final RowSignature signature,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use @Nullable where appropriate, but avoid @Nonnull on parameters in general, since it creates an implication that unannotated things can be null. But, they mostly can't, so it's misleading.

If you're looking for benefit for static analyzers, better to use @EverythingIsNonnullByDefault at package level through package-info.java. We have that on various packages already and could add it to more as desired. That's like annotating every un-annotated parameter with @Nonnull, without the clutter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the pointer! Will fix the clutter.

final List<KeyColumn> sortColumns,
final boolean allowNullColumnTypes
)
{
switch (Preconditions.checkNotNull(frameType, "frameType")) {
case COLUMNAR:
return new ColumnarFrameWriterFactory(allocatorFactory, signature, sortColumns);
return new ColumnarFrameWriterFactory(allocatorFactory, signature, sortColumns, allowNullColumnTypes);
case ROW_BASED:
return new RowBasedFrameWriterFactory(allocatorFactory, signature, sortColumns);
return new RowBasedFrameWriterFactory(allocatorFactory, signature, sortColumns, allowNullColumnTypes);
default:
throw new ISE("Unrecognized frame type [%s]", frameType);
}
}

public static FrameWriterFactory makeFrameWriterFactory(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method seems weird. You can call the base method directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this as a separate method originally because we don't require the boolean in the rest of the cases (i.e. the original ones in MSQ). Therefore it made sense to me to hide this complexity from the callers of this method that arent residing in the broker.

final FrameType frameType,
final MemoryAllocatorFactory allocatorFactory,
final RowSignature signature,
final List<KeyColumn> sortColumns
)
{
return makeFrameWriterFactory(frameType, allocatorFactory, signature, sortColumns, false);
}

/**
* Returns a copy of "signature" with columns rearranged so the provided sortColumns appear as a prefix.
* Throws an error if any of the sortColumns are not present in the input signature, or if any of their
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,19 @@ public class RowBasedFrameWriterFactory implements FrameWriterFactory
private final MemoryAllocatorFactory allocatorFactory;
private final RowSignature signature;
private final List<KeyColumn> sortColumns;
private final boolean allowNullColumnTypes;

public RowBasedFrameWriterFactory(
final MemoryAllocatorFactory allocatorFactory,
final RowSignature signature,
final List<KeyColumn> sortColumns
final List<KeyColumn> sortColumns,
final boolean allowNullColumnTypes
)
{
this.allocatorFactory = allocatorFactory;
this.signature = signature;
this.sortColumns = sortColumns;
this.allowNullColumnTypes = allowNullColumnTypes;

FrameWriterUtils.verifySortColumns(sortColumns, signature);
}
Expand All @@ -70,7 +73,7 @@ public FrameWriter newFrameWriter(final ColumnSelectorFactory columnSelectorFact
return new RowBasedFrameWriter(
signature,
sortColumns,
makeFieldWriters(columnSelectorFactory),
makeFieldWriters(columnSelectorFactory, allowNullColumnTypes),
FrameReaderUtils.makeRowMemorySupplier(columnSelectorFactory, signature),
rowOrderMemory,
rowOffsetMemory,
Expand Down Expand Up @@ -102,7 +105,7 @@ public FrameType frameType()
* The returned {@link FieldWriter} objects are not thread-safe, and should only be used with a
* single frame writer.
*/
private List<FieldWriter> makeFieldWriters(final ColumnSelectorFactory columnSelectorFactory)
private List<FieldWriter> makeFieldWriters(final ColumnSelectorFactory columnSelectorFactory, final boolean allowNullColumnTypes)
{
final List<FieldWriter> fieldWriters = new ArrayList<>();

Expand All @@ -111,7 +114,7 @@ private List<FieldWriter> makeFieldWriters(final ColumnSelectorFactory columnSel
final String column = signature.getColumnName(i);
// note: null type won't work, but we'll get a nice error from FrameColumnWriters.create
final ColumnType columnType = signature.getColumnType(i).orElse(null);
fieldWriters.add(FieldWriters.create(columnSelectorFactory, column, columnType));
fieldWriters.add(FieldWriters.create(columnSelectorFactory, column, columnType, allowNullColumnTypes));
}
}
catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class ColumnarFrameWriterFactory implements FrameWriterFactory
private final MemoryAllocatorFactory allocatorFactory;
private final RowSignature signature;
private final List<KeyColumn> keyColumns;
private final boolean allowNullTypes;

/**
* Create a ColumnarFrameWriterFactory.
Expand All @@ -57,12 +58,14 @@ public class ColumnarFrameWriterFactory implements FrameWriterFactory
public ColumnarFrameWriterFactory(
final MemoryAllocatorFactory allocatorFactory,
final RowSignature signature,
final List<KeyColumn> keyColumns
final List<KeyColumn> keyColumns,
final boolean allowNullTypes
)
{
this.allocatorFactory = Preconditions.checkNotNull(allocatorFactory, "allocatorFactory");
this.signature = signature;
this.keyColumns = Preconditions.checkNotNull(keyColumns, "sortColumns");
this.allowNullTypes = allowNullTypes;

if (!keyColumns.isEmpty()) {
throw new IAE("Columnar frames cannot be sorted");
Expand All @@ -86,7 +89,7 @@ public FrameWriter newFrameWriter(final ColumnSelectorFactory columnSelectorFact
final String column = signature.getColumnName(i);
// note: null type won't work, but we'll get a nice error from FrameColumnWriters.create
final ColumnType columnType = signature.getColumnType(i).orElse(null);
columnWriters.add(FrameColumnWriters.create(columnSelectorFactory, allocator, column, columnType));
columnWriters.add(FrameColumnWriters.create(columnSelectorFactory, allocator, column, columnType, allowNullTypes));
}
}
catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.nested.NestedDataComplexTypeSerde;
import org.apache.druid.segment.serde.ComplexMetricSerde;
import org.apache.druid.segment.serde.ComplexMetrics;

Expand Down Expand Up @@ -56,10 +57,15 @@ static FrameColumnWriter create(
final ColumnSelectorFactory columnSelectorFactory,
final MemoryAllocator allocator,
final String column,
final ColumnType type
final ColumnType type,
final boolean allowNullType
)
{
if (type == null) {
if (allowNullType) {
// Serde unknown types as complex
return makeComplexWriter(columnSelectorFactory, allocator, column, NestedDataComplexTypeSerde.TYPE_NAME);
}
throw new UnsupportedColumnTypeException(column, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.query.FramesBackedInlineDataSource;
import org.apache.druid.query.FramesBackedInlineDataSourceSerializer;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.context.ResponseContextDeserializer;
import org.joda.time.DateTimeZone;
Expand All @@ -52,6 +54,8 @@ public DruidDefaultSerializersModule()

JodaStuff.register(this);

addSerializer(FramesBackedInlineDataSource.class, new FramesBackedInlineDataSourceSerializer());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you let us know the reason why do you think adding the serializer here makes sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would think we don't need to serialize this, as it should exist in-memory only. So I'm also wondering where this was needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This serialization is required when the broker inlines the subquery results and sends the inlined query to the historicals. In that case, we serialize the Frames and FrameBasedInlineDatasource to behave as if an equivalent InlineDatasource (based on rows) would have been serialized.


addDeserializer(
DateTimeZone.class,
new JsonDeserializer<DateTimeZone>()
Expand Down
Loading