From bbcbf7003f27198d71ae7f3cc46acdb2d991926c Mon Sep 17 00:00:00 2001
From: abhishekagarwal87 <1477457+abhishekagarwal87@users.noreply.github.com>
Date: Wed, 18 Sep 2024 11:10:26 +0530
Subject: [PATCH] PostJoinCursor should never advance without interruption
---
.../java/org/apache/druid/segment/Cursor.java | 3 ++-
.../druid/segment/join/PostJoinCursor.java | 25 ++++++-------------
.../segment/join/PostJoinCursorTest.java | 21 +++++++++++++---
3 files changed, 28 insertions(+), 21 deletions(-)
diff --git a/processing/src/main/java/org/apache/druid/segment/Cursor.java b/processing/src/main/java/org/apache/druid/segment/Cursor.java
index 14a53eb41b7b..eb7c72b09e9a 100644
--- a/processing/src/main/java/org/apache/druid/segment/Cursor.java
+++ b/processing/src/main/java/org/apache/druid/segment/Cursor.java
@@ -69,7 +69,8 @@ public interface Cursor
/**
* Advance to the cursor to the next position. Callers should check {@link #isDone()} or
- * {@link #isDoneOrInterrupted()} before getting the next value from a selector.
+ * {@link #isDoneOrInterrupted()} before getting the next value from a selector. However, underlying
+ * implementation may still check for thread interruption if advancing the cursor is a long-running operation.
*/
void advanceUninterruptibly();
diff --git a/processing/src/main/java/org/apache/druid/segment/join/PostJoinCursor.java b/processing/src/main/java/org/apache/druid/segment/join/PostJoinCursor.java
index 5d9848d74718..571806d50ee6 100644
--- a/processing/src/main/java/org/apache/druid/segment/join/PostJoinCursor.java
+++ b/processing/src/main/java/org/apache/druid/segment/join/PostJoinCursor.java
@@ -84,21 +84,6 @@ private void advanceToMatch()
}
}
- /**
- * Matches tuples coming out of a join to a post-join condition uninterruptibly, and hence can be a long-running call.
- * For this reason, {@link PostJoinCursor#advance()} instead calls {@link PostJoinCursor#advanceToMatch()} (unlike
- * other cursors) that allows interruptions, thereby resolving issues where the
- * CPU thread running PostJoinCursor cannot be terminated
- */
- private void advanceToMatchUninterruptibly()
- {
- if (valueMatcher != null) {
- while (!isDone() && !valueMatcher.matches(false)) {
- baseCursor.advanceUninterruptibly();
- }
- }
- }
-
@Override
public ColumnSelectorFactory getColumnSelectorFactory()
{
@@ -120,11 +105,17 @@ public void advance()
advanceToMatch();
}
+
+ /**
+ * Advancing the post-join requires evaluating the join on whole segment and advancing without interruption can take
+ * a long time if there are no matches but the join itself is big. This can leave the thread running well after
+ * the timeout elapses. One such issue is described in
+ * CPU thread running PostJoinCursor cannot be terminated
+ */
@Override
public void advanceUninterruptibly()
{
- baseCursor.advanceUninterruptibly();
- advanceToMatchUninterruptibly();
+ advance();
}
@Override
diff --git a/processing/src/test/java/org/apache/druid/segment/join/PostJoinCursorTest.java b/processing/src/test/java/org/apache/druid/segment/join/PostJoinCursorTest.java
index ba194ace2c61..364e12ccd2da 100644
--- a/processing/src/test/java/org/apache/druid/segment/join/PostJoinCursorTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/join/PostJoinCursorTest.java
@@ -200,6 +200,17 @@ public Throwable getException()
@Test
public void testAdvanceWithInterruption() throws IOException, InterruptedException
+ {
+ testAdvance(true);
+ }
+
+ @Test
+ public void testAdvanceWithoutInterruption() throws IOException, InterruptedException
+ {
+ testAdvance(false);
+ }
+
+ private void testAdvance(boolean withInterruption) throws IOException, InterruptedException
{
final int rowsBeforeInterrupt = 1000;
@@ -214,7 +225,7 @@ public void testAdvanceWithInterruption() throws IOException, InterruptedExcepti
countriesTable = JoinTestHelper.createCountriesIndexedTable();
- Thread joinCursorThread = new Thread(() -> makeCursorAndAdvance());
+ Thread joinCursorThread = new Thread(() -> makeCursorAndAdvance(withInterruption));
ExceptionHandler exceptionHandler = new ExceptionHandler();
joinCursorThread.setUncaughtExceptionHandler(exceptionHandler);
joinCursorThread.start();
@@ -234,7 +245,7 @@ public void testAdvanceWithInterruption() throws IOException, InterruptedExcepti
fail();
}
- public void makeCursorAndAdvance()
+ public void makeCursorAndAdvance(boolean withInterruption)
{
List joinableClauses = ImmutableList.of(
@@ -272,7 +283,11 @@ public void inspectRuntimeShape(RuntimeShapeInspector inspector)
}
});
- cursor.advance();
+ if (withInterruption) {
+ cursor.advance();
+ } else {
+ cursor.advanceUninterruptibly();
+ }
}
}
}