diff --git a/processing/src/main/java/org/apache/druid/segment/Cursor.java b/processing/src/main/java/org/apache/druid/segment/Cursor.java index 14a53eb41b7b..eb7c72b09e9a 100644 --- a/processing/src/main/java/org/apache/druid/segment/Cursor.java +++ b/processing/src/main/java/org/apache/druid/segment/Cursor.java @@ -69,7 +69,8 @@ public interface Cursor /** * Advance to the cursor to the next position. Callers should check {@link #isDone()} or - * {@link #isDoneOrInterrupted()} before getting the next value from a selector. + * {@link #isDoneOrInterrupted()} before getting the next value from a selector. However, underlying + * implementation may still check for thread interruption if advancing the cursor is a long-running operation. */ void advanceUninterruptibly(); diff --git a/processing/src/main/java/org/apache/druid/segment/join/PostJoinCursor.java b/processing/src/main/java/org/apache/druid/segment/join/PostJoinCursor.java index 5d9848d74718..571806d50ee6 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/PostJoinCursor.java +++ b/processing/src/main/java/org/apache/druid/segment/join/PostJoinCursor.java @@ -84,21 +84,6 @@ private void advanceToMatch() } } - /** - * Matches tuples coming out of a join to a post-join condition uninterruptibly, and hence can be a long-running call. - * For this reason, {@link PostJoinCursor#advance()} instead calls {@link PostJoinCursor#advanceToMatch()} (unlike - * other cursors) that allows interruptions, thereby resolving issues where the - * CPU thread running PostJoinCursor cannot be terminated - */ - private void advanceToMatchUninterruptibly() - { - if (valueMatcher != null) { - while (!isDone() && !valueMatcher.matches(false)) { - baseCursor.advanceUninterruptibly(); - } - } - } - @Override public ColumnSelectorFactory getColumnSelectorFactory() { @@ -120,11 +105,17 @@ public void advance() advanceToMatch(); } + + /** + * Advancing the post-join requires evaluating the join on whole segment and advancing without interruption can take + * a long time if there are no matches but the join itself is big. This can leave the thread running well after + * the timeout elapses. One such issue is described in + * CPU thread running PostJoinCursor cannot be terminated + */ @Override public void advanceUninterruptibly() { - baseCursor.advanceUninterruptibly(); - advanceToMatchUninterruptibly(); + advance(); } @Override diff --git a/processing/src/test/java/org/apache/druid/segment/join/PostJoinCursorTest.java b/processing/src/test/java/org/apache/druid/segment/join/PostJoinCursorTest.java index ba194ace2c61..364e12ccd2da 100644 --- a/processing/src/test/java/org/apache/druid/segment/join/PostJoinCursorTest.java +++ b/processing/src/test/java/org/apache/druid/segment/join/PostJoinCursorTest.java @@ -200,6 +200,17 @@ public Throwable getException() @Test public void testAdvanceWithInterruption() throws IOException, InterruptedException + { + testAdvance(true); + } + + @Test + public void testAdvanceWithoutInterruption() throws IOException, InterruptedException + { + testAdvance(false); + } + + private void testAdvance(boolean withInterruption) throws IOException, InterruptedException { final int rowsBeforeInterrupt = 1000; @@ -214,7 +225,7 @@ public void testAdvanceWithInterruption() throws IOException, InterruptedExcepti countriesTable = JoinTestHelper.createCountriesIndexedTable(); - Thread joinCursorThread = new Thread(() -> makeCursorAndAdvance()); + Thread joinCursorThread = new Thread(() -> makeCursorAndAdvance(withInterruption)); ExceptionHandler exceptionHandler = new ExceptionHandler(); joinCursorThread.setUncaughtExceptionHandler(exceptionHandler); joinCursorThread.start(); @@ -234,7 +245,7 @@ public void testAdvanceWithInterruption() throws IOException, InterruptedExcepti fail(); } - public void makeCursorAndAdvance() + public void makeCursorAndAdvance(boolean withInterruption) { List joinableClauses = ImmutableList.of( @@ -272,7 +283,11 @@ public void inspectRuntimeShape(RuntimeShapeInspector inspector) } }); - cursor.advance(); + if (withInterruption) { + cursor.advance(); + } else { + cursor.advanceUninterruptibly(); + } } } }