Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Materialize scan results correctly when columns are not present in the segments #16619

Merged
merged 7 commits into from
Jun 23, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntList;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.allocation.MemoryAllocatorFactory;
Expand All @@ -36,6 +39,7 @@
import org.apache.druid.query.FrameSignaturePair;
import org.apache.druid.query.IterableRowsCursorHelper;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;

import java.io.Closeable;
Expand Down Expand Up @@ -157,10 +161,35 @@ private static class ScanResultValueFramesIterator implements Iterator<FrameSign
Cursor currentCursor = null;

/**
* Row signature of the current row
* Rows in the List form. The {@link #currentCursor} is a wrapper over these rows
*/
List<Object[]> currentRows = null;

/**
* Row index pointing to the current row in {@link #currentRows}. This is the exact same row that the {@link #currentCursor}
* is also pointing at. Therefore {@link #currentRows} + {@link #currentCursor} represent the same information as presented
* by {@link #currentCursor}.
*/
int currentRowIndex = -1;

/**
* Row signature of the current cursor. This is used to create the cursor out of the ScanResultValue. We have to use
* the full signature because the ScanResultValue will have
*/
RowSignature currentRowSignature = null;

/**
* Row signature of the current cursor, with columns having unknown (null) types trimmed out. This is used to write
* the rows onto the frame. There's an implicit assumption (that we verify), that columns with null typed only
* contain null values, because the underlying segment didn't have the column.
*/
RowSignature trimmedRowSignature = null;
LakshSingla marked this conversation as resolved.
Show resolved Hide resolved

/**
* Columns of the currentRows with missing type information. As we materialize the rows onto the frames, we also
* verify that these columns only contain null values.
*/
IntList nullTypedColumns = null;

public ScanResultValueFramesIterator(
Sequence<ScanResultValue> resultSequence,
Expand Down Expand Up @@ -200,26 +229,33 @@ public FrameSignaturePair next()
// start all the processing
populateCursor();
boolean firstRowWritten = false;
// While calling populateCursor() repeatedly, currentRowSignature might change. Therefore we store the signature
// While calling populateCursor() repeatedly, currentRowSignature might change. Therefore, we store the signature
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

....what if the signature changes - is that a problem? shouldn't that be an Exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there are two cursors, CursorA with RowSignatureA and CursorB with RowSignatureB and the cursor is at the last row of CursorA, populate call will return false, i.e. the two cursors cannot be batched together, and set currentRowSignature to the RowSignatureB (i.e. prepare the variables for the next write). We still want to return the old frame with the old signature therefore we need to preserve the signature with which we have written the frame.
Per your previous suggestion, frameWriterFactory.signature() would be sufficient and cleaner, and I will use that instead.

// with which we have written the frames
final RowSignature writtenSignature = currentRowSignature;
final RowSignature writtenSignature = trimmedRowSignature;
LakshSingla marked this conversation as resolved.
Show resolved Hide resolved
FrameWriterFactory frameWriterFactory = FrameWriters.makeFrameWriterFactory(
FrameType.COLUMNAR,
memoryAllocatorFactory,
currentRowSignature,
trimmedRowSignature,
Collections.emptyList()
);
Frame frame;
try (final FrameWriter frameWriter = frameWriterFactory.newFrameWriter(new SettableCursorColumnSelectorFactory(
() -> currentCursor,
currentRowSignature
))) {
try (final FrameWriter frameWriter = frameWriterFactory.newFrameWriter(
new SettableCursorColumnSelectorFactory(() -> currentCursor, currentRowSignature))) {
while (populateCursor()) { // Do till we don't have any more rows, or the next row isn't compatible with the current row
if (!frameWriter.addSelection()) { // Add the cursor's row to the frame, till the frame is full
break;
}

for (Integer columnNumber : nullTypedColumns) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: I wonder why use a fastutil IntList - if it gets iterated with a foreach ; plain get?
this could be moved into some method like validateRow - that will naturally do a CSE of the currentRows.get(currentRowIndex) so that it will be only evaluated once

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reason to use FastUtil IntList as such. I just thought it might be faster to create than an arraylist.

this could be moved into some method like validateRow - that will naturally do a CSE of the currentRows.get(currentRowIndex) so that it will be only evaluated once

It is getting evaluated once here right? Unless I misinterpreted your comment

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was just a note; this loop is validating one row; but to access that it has to do a function call currentRows.get(currentRowIndex) ; which became part of the loop body - moving it into a method could make it clear that it works on a row - and it will naturally remove the currentRows.get(currentRowIndex) as that's the row :)

if (currentRows.get(currentRowIndex)[columnNumber] != null) {
throw DruidException.defensive("Expected a null value");
}
}

firstRowWritten = true;
// Check that the columns with the null types are actually null before advancing
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: isn't this comment misplaced? (note: this detail is not necessary - but it could live as an apidoc of the validateRow if that would be around)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleaned up the code

currentCursor.advance();
currentRowIndex++;
}

if (!firstRowWritten) {
Expand Down Expand Up @@ -257,7 +293,9 @@ private boolean done()
* if (hasNext()) was true before calling the method -
* 1. {@link #currentCursor} - Points to the cursor with non-empty value (i.e. isDone()) is false, and the cursor points
* to the next row present in the sequence of the scan result values. This row would get materialized to frame
* 2. {@link #currentRowSignature} - Row signature of the row.
* 2. {@link #currentRowSignature} - Row signature of the row
* 3. {@link #currentRows} - Points to the group of rows underlying the currentCursor
* 4. {@link #currentRowIndex} - Reset to 0 if we modified the cursor, else untouched
* <p>
* Return value -
* if (hasNext()) is false before calling the method - returns false
Expand All @@ -276,19 +314,36 @@ private boolean populateCursor()
// At this point, we know that we need to move to the next non-empty cursor, AND it exists, because
// done() is not false
ScanResultValue scanResultValue = resultSequenceIterator.next();

final RowSignature rowSignature = scanResultValue.getRowSignature() != null
? scanResultValue.getRowSignature()
: defaultRowSignature;

RowSignature modifiedRowSignature = useNestedForUnknownTypes
? FrameWriterUtils.replaceUnknownTypesWithNestedColumns(rowSignature)
: rowSignature;

// currentRowSignature at this time points to the previous row's signature
final boolean compatible = modifiedRowSignature != null
&& modifiedRowSignature.equals(currentRowSignature);
IntList currentNullTypedColumns = new IntArrayList();
RowSignature.Builder modifiedTrimmedRowSignatureBuilder = RowSignature.builder();

for (int i = 0; i < modifiedRowSignature.size(); ++i) {
ColumnType columnType = modifiedRowSignature.getColumnType(i).orElse(null);
if (columnType == null) {
currentNullTypedColumns.add(i);
} else {
modifiedTrimmedRowSignatureBuilder.add(modifiedRowSignature.getColumnName(i), columnType);
}
}

RowSignature modifiedTrimmedRowSignature = modifiedTrimmedRowSignatureBuilder.build();

// currentRowSignature at this time points to the previous row's signature. We look at the trimmed signature
// because that is the one used to write onto the frames, and if two rows have same trimmed signature, we can
// write both the rows onto the same frame
final boolean compatible = modifiedTrimmedRowSignature.equals(trimmedRowSignature);

final List rows = (List) scanResultValue.getEvents();
final Iterable<Object[]> formattedRows = Lists.newArrayList(Iterables.transform(
final List<Object[]> formattedRows = Lists.newArrayList(Iterables.transform(
rows,
(Function) resultFormatMapper.apply(modifiedRowSignature)
));
Expand All @@ -307,6 +362,12 @@ private boolean populateCursor()
}

currentRowSignature = modifiedRowSignature;
trimmedRowSignature = modifiedTrimmedRowSignature;
nullTypedColumns = currentNullTypedColumns;
currentRows = formattedRows;
currentRowIndex = 0;


return compatible;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.allocation.ArenaMemoryAllocatorFactory;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.FrameBasedInlineDataSource;
import org.apache.druid.query.FrameSignaturePair;
Expand Down Expand Up @@ -55,6 +57,18 @@ public class ScanResultValueFramesIterableTest extends InitializedNullHandlingTe
.add("col2", ColumnType.LONG)
.build();

private static final RowSignature SIGNATURE3 = RowSignature.builder()
.add("col1", ColumnType.DOUBLE)
.add("col2", ColumnType.LONG)
.add("col3", null)
.build();

private static final RowSignature SIGNATURE4 = RowSignature.builder()
.add("col1", ColumnType.DOUBLE)
.add("col3", null)
.add("col2", ColumnType.LONG)
.build();


@Test
public void testEmptySequence()
Expand Down Expand Up @@ -191,6 +205,32 @@ public void testBatchingWithHeterogenousScanResultValues()
);
}

@Test
public void testBatchingWithHeterogenousScanResultValuesAndNullTypes()
{
List<FrameSignaturePair> frames = Lists.newArrayList(
createIterable(
scanResultValue1(2),
scanResultValue3(2)
)
);
Assert.assertEquals(2, frames.size());
QueryToolChestTestHelper.assertArrayResultsEquals(
ImmutableList.of(
new Object[]{1L, 1.0D},
new Object[]{2L, 2.0D}
),
new FrameBasedInlineDataSource(Collections.singletonList(frames.get(0)), SIGNATURE1).getRowsAsSequence()
);
QueryToolChestTestHelper.assertArrayResultsEquals(
ImmutableList.of(
new Object[]{5.0D, 5L},
new Object[]{6.0D, 6L}
),
new FrameBasedInlineDataSource(Collections.singletonList(frames.get(1)), SIGNATURE2).getRowsAsSequence()
);
}

@Test
public void testBatchingWithHeterogenousAndEmptyScanResultValues()
{
Expand Down Expand Up @@ -222,6 +262,71 @@ public void testBatchingWithHeterogenousAndEmptyScanResultValues()
);
}

@Test
public void testBatchingWithHeterogenousAndEmptyScanResultValuesAndNullTypes()
{
List<FrameSignaturePair> frames = Lists.newArrayList(
createIterable(
scanResultValue1(0),
scanResultValue2(0),
scanResultValue1(2),
scanResultValue1(0),
scanResultValue2(2),
scanResultValue2(0),
scanResultValue2(0)
)
);
Assert.assertEquals(2, frames.size());
QueryToolChestTestHelper.assertArrayResultsEquals(
ImmutableList.of(
new Object[]{1L, 1.0D},
new Object[]{2L, 2.0D}
),
new FrameBasedInlineDataSource(Collections.singletonList(frames.get(0)), SIGNATURE1).getRowsAsSequence()
);
QueryToolChestTestHelper.assertArrayResultsEquals(
ImmutableList.of(
new Object[]{3.0D, 3L},
new Object[]{4.0D, 4L}
),
new FrameBasedInlineDataSource(Collections.singletonList(frames.get(1)), SIGNATURE2).getRowsAsSequence()
);
}

@Test
public void testBatchingWithDifferentRowSignaturesButSameTrimmedRowSignature()
{
List<FrameSignaturePair> frames = Lists.newArrayList(
createIterable(
scanResultValue3(0),
scanResultValue4(0),
scanResultValue3(2),
scanResultValue3(0),
scanResultValue4(2),
scanResultValue4(0),
scanResultValue3(0)
)
);
Assert.assertEquals(1, frames.size());
QueryToolChestTestHelper.assertArrayResultsEquals(
ImmutableList.of(
new Object[]{5.0D, 5L},
new Object[]{6.0D, 6L},
new Object[]{7.0D, 7L},
new Object[]{8.0D, 8L}
),
new FrameBasedInlineDataSource(Collections.singletonList(frames.get(0)), SIGNATURE2).getRowsAsSequence()
LakshSingla marked this conversation as resolved.
Show resolved Hide resolved
);
}

@Test
public void testExceptionThrownWithMissingType()
{
Sequence<FrameSignaturePair> frames = Sequences.simple(createIterable(incompleteTypeScanResultValue(1)));
Assert.assertThrows(DruidException.class, frames::toList);
}


@Test
public void testSplitting()
{
Expand Down Expand Up @@ -267,4 +372,37 @@ private static ScanResultValue scanResultValue2(int numRows)
SIGNATURE2
);
}

// Signature: col1: DOUBLE, col2: LONG, col3: null
private static ScanResultValue scanResultValue3(int numRows)
{
return new ScanResultValue(
"dummy",
ImmutableList.of("col1", "col2", "col3"),
IntStream.range(5, 5 + numRows).mapToObj(i -> new Object[]{(double) i, i, null}).collect(Collectors.toList()),
SIGNATURE3
);
}

// Signature: col1: DOUBLE, col3: null, col2: LONG
private static ScanResultValue scanResultValue4(int numRows)
{
return new ScanResultValue(
"dummy",
ImmutableList.of("col1", "col3", "col2"),
IntStream.range(7, 7 + numRows).mapToObj(i -> new Object[]{(double) i, null, i}).collect(Collectors.toList()),
SIGNATURE4
);
}

// Contains ScanResultValue with incomplete type, and non-null row
private static ScanResultValue incompleteTypeScanResultValue(int numRows)
{
return new ScanResultValue(
"dummy",
ImmutableList.of("col1", "col3", "col2"),
IntStream.range(7, 7 + numRows).mapToObj(i -> new Object[]{(double) i, i, i}).collect(Collectors.toList()),
LakshSingla marked this conversation as resolved.
Show resolved Hide resolved
SIGNATURE4
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,8 @@ public static void setupNullValues()
ImmutableMap.<String, Object>builder()
.putAll(QUERY_CONTEXT_DEFAULT)
.put(QueryContexts.MAX_SUBQUERY_BYTES_KEY, "100000")
// Disallows the fallback to row based limiting
.put(QueryContexts.MAX_SUBQUERY_ROWS_KEY, "1")
.build();

// Add additional context to the given context map for when the
Expand Down
Loading
Loading