Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Joins can take forever to finish on historicals without ever timing out. #17099

Merged
merged 1 commit into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ public interface Cursor

/**
* Advance to the cursor to the next position. Callers should check {@link #isDone()} or
* {@link #isDoneOrInterrupted()} before getting the next value from a selector.
* {@link #isDoneOrInterrupted()} before getting the next value from a selector. However, underlying
* implementation may still check for thread interruption if advancing the cursor is a long-running operation.
*/
void advanceUninterruptibly();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,21 +84,6 @@ private void advanceToMatch()
}
}

/**
* Matches tuples coming out of a join to a post-join condition uninterruptibly, and hence can be a long-running call.
* For this reason, {@link PostJoinCursor#advance()} instead calls {@link PostJoinCursor#advanceToMatch()} (unlike
* other cursors) that allows interruptions, thereby resolving issues where the
* <a href="https://github.com/apache/druid/issues/14514">CPU thread running PostJoinCursor cannot be terminated</a>
*/
private void advanceToMatchUninterruptibly()
{
if (valueMatcher != null) {
while (!isDone() && !valueMatcher.matches(false)) {
baseCursor.advanceUninterruptibly();
}
}
}

@Override
public ColumnSelectorFactory getColumnSelectorFactory()
{
Expand All @@ -120,11 +105,17 @@ public void advance()
advanceToMatch();
}


/**
* Advancing the post-join requires evaluating the join on whole segment and advancing without interruption can take
* a long time if there are no matches but the join itself is big. This can leave the thread running well after
* the timeout elapses. One such issue is described in
* <a href="https://github.com/apache/druid/issues/14514">CPU thread running PostJoinCursor cannot be terminated</a>
*/
@Override
public void advanceUninterruptibly()
{
baseCursor.advanceUninterruptibly();
advanceToMatchUninterruptibly();
advance();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,17 @@ public Throwable getException()

@Test
public void testAdvanceWithInterruption() throws IOException, InterruptedException
{
testAdvance(true);
}

@Test
public void testAdvanceWithoutInterruption() throws IOException, InterruptedException
{
testAdvance(false);
}

private void testAdvance(boolean withInterruption) throws IOException, InterruptedException
{

final int rowsBeforeInterrupt = 1000;
Expand All @@ -214,7 +225,7 @@ public void testAdvanceWithInterruption() throws IOException, InterruptedExcepti

countriesTable = JoinTestHelper.createCountriesIndexedTable();

Thread joinCursorThread = new Thread(() -> makeCursorAndAdvance());
Thread joinCursorThread = new Thread(() -> makeCursorAndAdvance(withInterruption));
ExceptionHandler exceptionHandler = new ExceptionHandler();
joinCursorThread.setUncaughtExceptionHandler(exceptionHandler);
joinCursorThread.start();
Expand All @@ -234,7 +245,7 @@ public void testAdvanceWithInterruption() throws IOException, InterruptedExcepti
fail();
}

public void makeCursorAndAdvance()
public void makeCursorAndAdvance(boolean withInterruption)
{

List<JoinableClause> joinableClauses = ImmutableList.of(
Expand Down Expand Up @@ -272,7 +283,11 @@ public void inspectRuntimeShape(RuntimeShapeInspector inspector)
}
});

cursor.advance();
if (withInterruption) {
cursor.advance();
} else {
cursor.advanceUninterruptibly();
}
}
}
}
Loading