Skip to content

Commit

Permalink
Materialize scan results correctly when columns are not present in th…
Browse files Browse the repository at this point in the history
…e segments (#16619)

Fixes a bug causing maxSubqueryBytes not to work when segments have missing columns.
  • Loading branch information
LakshSingla authored Jun 23, 2024
1 parent a63c12b commit 00c9643
Show file tree
Hide file tree
Showing 4 changed files with 466 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntList;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.allocation.MemoryAllocatorFactory;
Expand All @@ -36,6 +39,7 @@
import org.apache.druid.query.FrameSignaturePair;
import org.apache.druid.query.IterableRowsCursorHelper;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;

import java.io.Closeable;
Expand Down Expand Up @@ -157,17 +161,41 @@ private static class ScanResultValueFramesIterator implements Iterator<FrameSign
Cursor currentCursor = null;

/**
* Row signature of the current row
* Rows in the List form. The {@link #currentCursor} is a wrapper over these rows
*/
RowSignature currentRowSignature = null;
List<Object[]> currentRows = null;

/**
* Row index pointing to the current row in {@link #currentRows}. This is the exact same row that the {@link #currentCursor}
* is also pointing at. Therefore {@link #currentRows} + {@link #currentCursor} represent the same information as presented
* by {@link #currentCursor}.
*/
int currentRowIndex = -1;

/**
* Full row signature of the ScanResultValue, used to extract the rows out of it.
*/
RowSignature currentInputRowSignature = null;

/**
* Row signature of the ScanResultValue, with columns having unknown (null) types trimmed out. This is used to write
* the rows onto the frame. There's an implicit assumption (that we verify), that columns with null typed only
* contain null values, because the underlying segment didn't have the column.
*/
RowSignature currentOutputRowSignature = null;

/**
* Columns of the currentRows with missing type information. As we materialize the rows onto the frames, we also
* verify that these columns only contain null values.
*/
IntList nullTypedColumns = null;

public ScanResultValueFramesIterator(
Sequence<ScanResultValue> resultSequence,
MemoryAllocatorFactory memoryAllocatorFactory,
boolean useNestedForUnknownTypes,
RowSignature defaultRowSignature,
Function<RowSignature, Function<?, Object[]>> resultFormatMapper
final Sequence<ScanResultValue> resultSequence,
final MemoryAllocatorFactory memoryAllocatorFactory,
final boolean useNestedForUnknownTypes,
final RowSignature defaultRowSignature,
final Function<RowSignature, Function<?, Object[]>> resultFormatMapper
)
{
this.memoryAllocatorFactory = memoryAllocatorFactory;
Expand Down Expand Up @@ -200,26 +228,35 @@ public FrameSignaturePair next()
// start all the processing
populateCursor();
boolean firstRowWritten = false;
// While calling populateCursor() repeatedly, currentRowSignature might change. Therefore we store the signature
// with which we have written the frames
final RowSignature writtenSignature = currentRowSignature;
FrameWriterFactory frameWriterFactory = FrameWriters.makeFrameWriterFactory(

final FrameWriterFactory frameWriterFactory = FrameWriters.makeFrameWriterFactory(
FrameType.COLUMNAR,
memoryAllocatorFactory,
currentRowSignature,
currentOutputRowSignature,
Collections.emptyList()
);
Frame frame;
try (final FrameWriter frameWriter = frameWriterFactory.newFrameWriter(new SettableCursorColumnSelectorFactory(
() -> currentCursor,
currentRowSignature
))) {
final Frame frame;
try (final FrameWriter frameWriter = frameWriterFactory.newFrameWriter(
new SettableCursorColumnSelectorFactory(() -> currentCursor, currentInputRowSignature))) {
while (populateCursor()) { // Do till we don't have any more rows, or the next row isn't compatible with the current row
if (!frameWriter.addSelection()) { // Add the cursor's row to the frame, till the frame is full
break;
}

// Check that the columns with the null types are actually null before advancing
final Object[] currentRow = currentRows.get(currentRowIndex);
for (Integer columnNumber : nullTypedColumns) {
if (currentRow[columnNumber] != null) {
throw DruidException.defensive(
"Expected a null value for column [%s]",
frameWriterFactory.signature().getColumnName(columnNumber)
);
}
}

firstRowWritten = true;
currentCursor.advance();
currentRowIndex++;
}

if (!firstRowWritten) {
Expand All @@ -228,7 +265,9 @@ public FrameSignaturePair next()
frame = Frame.wrap(frameWriter.toByteArray());
}

return new FrameSignaturePair(frame, writtenSignature);
// While calling populateCursor() repeatedly, currentRowSignature might change. Therefore, we store the signature
// with which we have written the frames
return new FrameSignaturePair(frame, frameWriterFactory.signature());
}

/**
Expand All @@ -244,7 +283,7 @@ private boolean done()

/**
* This is the most important method of this iterator. This determines if two consecutive scan result values can
* be batched or not, populates the value of the {@link #currentCursor} and {@link #currentRowSignature},
* be batched or not, populates the value of the {@link #currentCursor} and {@link #currentInputRowSignature},
* during the course of the iterator, and facilitates the {@link #next()}
* <p>
* Multiple calls to populateCursor, without advancing the {@link #currentCursor} is idempotent. This allows successive
Expand All @@ -257,7 +296,9 @@ private boolean done()
* if (hasNext()) was true before calling the method -
* 1. {@link #currentCursor} - Points to the cursor with non-empty value (i.e. isDone()) is false, and the cursor points
* to the next row present in the sequence of the scan result values. This row would get materialized to frame
* 2. {@link #currentRowSignature} - Row signature of the row.
* 2. {@link #currentInputRowSignature} - Row signature of the row
* 3. {@link #currentRows} - Points to the group of rows underlying the currentCursor
* 4. {@link #currentRowIndex} - Reset to 0 if we modified the cursor, else untouched
* <p>
* Return value -
* if (hasNext()) is false before calling the method - returns false
Expand All @@ -275,25 +316,42 @@ private boolean populateCursor()

// At this point, we know that we need to move to the next non-empty cursor, AND it exists, because
// done() is not false
ScanResultValue scanResultValue = resultSequenceIterator.next();
final ScanResultValue scanResultValue = resultSequenceIterator.next();

final RowSignature rowSignature = scanResultValue.getRowSignature() != null
? scanResultValue.getRowSignature()
: defaultRowSignature;
RowSignature modifiedRowSignature = useNestedForUnknownTypes

final RowSignature modifiedRowSignature = useNestedForUnknownTypes
? FrameWriterUtils.replaceUnknownTypesWithNestedColumns(rowSignature)
: rowSignature;

// currentRowSignature at this time points to the previous row's signature
final boolean compatible = modifiedRowSignature != null
&& modifiedRowSignature.equals(currentRowSignature);
final IntList currentNullTypedColumns = new IntArrayList();
final RowSignature.Builder modifiedTrimmedRowSignatureBuilder = RowSignature.builder();

for (int i = 0; i < modifiedRowSignature.size(); ++i) {
ColumnType columnType = modifiedRowSignature.getColumnType(i).orElse(null);
if (columnType == null) {
currentNullTypedColumns.add(i);
} else {
modifiedTrimmedRowSignatureBuilder.add(modifiedRowSignature.getColumnName(i), columnType);
}
}

final RowSignature modifiedTrimmedRowSignature = modifiedTrimmedRowSignatureBuilder.build();

// currentRowSignature at this time points to the previous row's signature. We look at the trimmed signature
// because that is the one used to write onto the frames, and if two rows have same trimmed signature, we can
// write both the rows onto the same frame
final boolean compatible = modifiedTrimmedRowSignature.equals(currentOutputRowSignature);

final List rows = (List) scanResultValue.getEvents();
final Iterable<Object[]> formattedRows = Lists.newArrayList(Iterables.transform(
final List<Object[]> formattedRows = Lists.newArrayList(Iterables.transform(
rows,
(Function) resultFormatMapper.apply(modifiedRowSignature)
));

Pair<Cursor, Closeable> cursorAndCloseable = IterableRowsCursorHelper.getCursorFromIterable(
final Pair<Cursor, Closeable> cursorAndCloseable = IterableRowsCursorHelper.getCursorFromIterable(
formattedRows,
modifiedRowSignature
);
Expand All @@ -306,7 +364,13 @@ private boolean populateCursor()
return populateCursor();
}

currentRowSignature = modifiedRowSignature;
currentInputRowSignature = modifiedRowSignature;
currentOutputRowSignature = modifiedTrimmedRowSignature;
nullTypedColumns = currentNullTypedColumns;
currentRows = formattedRows;
currentRowIndex = 0;


return compatible;
}
}
Expand Down
Loading

0 comments on commit 00c9643

Please sign in to comment.