Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SuperSorter: direct merging, increased parallelism. #16775

Merged
merged 1 commit into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@
import org.apache.druid.frame.key.KeyColumn;
import org.apache.druid.frame.key.RowKey;
import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.frame.segment.FrameCursor;
import org.apache.druid.frame.write.FrameWriter;
import org.apache.druid.frame.write.FrameWriterFactory;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.Cursor;

import javax.annotation.Nullable;
import java.io.IOException;
Expand Down Expand Up @@ -73,6 +73,17 @@ public class FrameChannelMerger implements FrameProcessor<Long>
// ColumnSelectorFactory that always reads from the current row in the merged sequence.
final MultiColumnSelectorFactory mergedColumnSelectorFactory;

/**
* @param inputChannels readable frame channels. Each channel must be sorted (i.e., if all frames in the channel
* are concatenated, the concatenated result must be fully sorted).
* @param frameReader reader for frames
* @param outputChannel writable channel to receive the merge-sorted data
* @param frameWriterFactory writer for frames
* @param sortKey sort key for input and output frames
* @param partitions partitions for output frames. If non-null, output frames are written with
* {@link FrameWithPartition#partition()} set according to this parameter
* @param rowLimit maximum number of rows to write to the output channel
*/
public FrameChannelMerger(
final List<ReadableFrameChannel> inputChannels,
final FrameReader frameReader,
Expand All @@ -91,9 +102,8 @@ public FrameChannelMerger(
partitions == null ? ClusterByPartitions.oneUniversalPartition() : partitions;

if (!partitionsToUse.allAbutting()) {
// Sanity check: we lack logic in FrameMergeIterator for not returning every row in the provided frames, so make
// sure there are no holes in partitionsToUse. Note that this check isn't perfect, because rows outside the
// min / max value of partitionsToUse can still appear. But it's a cheap check, and it doesn't hurt to do it.
// To simplify merging logic, when frames we only look at the earliest and latest key in "partitions". To ensure
// correctness, we need to verify that there are no gaps.
throw new IAE("Partitions must all abut each other");
}

Expand Down Expand Up @@ -124,9 +134,9 @@ public FrameChannelMerger(
return -1;
} else {
return currentFrames[k1].comparisonWidget.compare(
currentFrames[k1].rowNumber,
currentFrames[k1].rowNumber(),
currentFrames[k2].comparisonWidget,
currentFrames[k2].rowNumber
currentFrames[k2].rowNumber()
);
}
}
Expand Down Expand Up @@ -193,13 +203,13 @@ private FrameWithPartition nextFrame()

if (currentPartitionEnd != null) {
final FramePlus currentFrame = currentFrames[currentChannel];
if (currentFrame.comparisonWidget.compare(currentFrame.rowNumber, currentPartitionEnd) >= 0) {
if (currentFrame.comparisonWidget.compare(currentFrame.rowNumber(), currentPartitionEnd) >= 0) {
// Current key is past the end of the partition. Advance currentPartition til it matches the current key.
do {
currentPartition++;
currentPartitionEnd = partitions.get(currentPartition).getEnd();
} while (currentPartitionEnd != null
&& currentFrame.comparisonWidget.compare(currentFrame.rowNumber, currentPartitionEnd) >= 0);
&& currentFrame.comparisonWidget.compare(currentFrame.rowNumber(), currentPartitionEnd) >= 0);

if (mergedFrameWriter.getNumRows() == 0) {
// Fall through: keep reading into the new partition.
Expand Down Expand Up @@ -229,9 +239,9 @@ private FrameWithPartition nextFrame()
} else {
// Continue reading the currentChannel.
final FramePlus channelFramePlus = currentFrames[currentChannel];
channelFramePlus.advance();
channelFramePlus.cursor.advance();

if (channelFramePlus.cursor.isDone()) {
if (channelFramePlus.isDone()) {
// Done reading current frame from "channel".
// Clear it and see if there is another one available for immediate loading.
currentFrames[currentChannel] = null;
Expand All @@ -242,8 +252,15 @@ private FrameWithPartition nextFrame()
if (channel.canRead()) {
// Read next frame from this channel.
final Frame frame = channel.read();
currentFrames[currentChannel] = new FramePlus(frame, frameReader, sortKey);
remainingChannels++;
final FramePlus framePlus = makeFramePlus(frame, frameReader);
if (framePlus.isDone()) {
// Nothing to read in this frame. Not finished; we can't continue.
// Finish up the current frame and return it.
break;
} else {
currentFrames[currentChannel] = framePlus;
remainingChannels++;
}
} else if (channel.isFinished()) {
// Done reading this channel. Fall through and continue with other channels.
} else {
Expand Down Expand Up @@ -284,8 +301,13 @@ private IntSet populateCurrentFramesAndTournamentTree()

if (channel.canRead()) {
final Frame frame = channel.read();
currentFrames[i] = new FramePlus(frame, frameReader, sortKey);
remainingChannels++;
final FramePlus framePlus = makeFramePlus(frame, frameReader);
if (framePlus.isDone()) {
await.add(i);
} else {
currentFrames[i] = framePlus;
remainingChannels++;
}
} else if (!channel.isFinished()) {
await.add(i);
}
Expand All @@ -295,26 +317,89 @@ private IntSet populateCurrentFramesAndTournamentTree()
return await;
}

/**
* Creates a {@link FramePlus} with start and end row set to match {@link #partitions}.
*/
private FramePlus makeFramePlus(
final Frame frame,
final FrameReader frameReader
)
{
final FrameCursor cursor = FrameProcessors.makeCursor(frame, frameReader);
final FrameComparisonWidget comparisonWidget = frameReader.makeComparisonWidget(frame, sortKey);
cursor.setCurrentRow(findRow(frame, comparisonWidget, partitions.get(0).getStart()));

final RowKey endRowKey = partitions.get(partitions.size() - 1).getEnd();
final int endRow;

if (endRowKey == null) {
endRow = frame.numRows();
} else {
endRow = findRow(frame, comparisonWidget, endRowKey);
}

return new FramePlus(cursor, comparisonWidget, endRow);
}

/**
* Find the first row in a frame with a key equal to, or greater than, the provided key. Returns 0 if the input
* key is null.
*/
static int findRow(
final Frame frame,
final FrameComparisonWidget comparisonWidget,
@Nullable final RowKey key
)
{
if (key == null) {
return 0;
}

int minIndex = 0;
int maxIndex = frame.numRows();

while (minIndex < maxIndex) {
final int currIndex = (minIndex + maxIndex) / 2;
final int cmp = comparisonWidget.compare(currIndex, key);

if (cmp < 0) {
minIndex = currIndex + 1;
} else {
maxIndex = currIndex;
}
}

return minIndex;
}

/**
* Class that encapsulates the apparatus necessary for reading a {@link Frame}.
*/
private static class FramePlus
{
private final Cursor cursor;
private final FrameCursor cursor;
private final FrameComparisonWidget comparisonWidget;
private int rowNumber;
private final int endRow;

public FramePlus(
final FrameCursor cursor,
final FrameComparisonWidget comparisonWidget,
final int endRow
)
{
this.cursor = cursor;
this.comparisonWidget = comparisonWidget;
this.endRow = endRow;
}

private FramePlus(Frame frame, FrameReader frameReader, List<KeyColumn> sortKey)
public int rowNumber()
{
this.cursor = FrameProcessors.makeCursor(frame, frameReader);
this.comparisonWidget = frameReader.makeComparisonWidget(frame, sortKey);
this.rowNumber = 0;
return cursor.getCurrentRow();
}

private void advance()
public boolean isDone()
{
cursor.advance();
rowNumber++;
return cursor.getCurrentRow() >= endRow;
}
}
}
Loading
Loading