From 288fb79a3b45d8bd086f4aff6085c42ed6d939d7 Mon Sep 17 00:00:00 2001 From: ChrisHegarty Date: Sat, 13 Jan 2024 13:52:10 +0000 Subject: [PATCH 1/7] Workaround LinkedTransferQueue JDK bug --- .../common/util/concurrent/EsExecutors.java | 17 +++++++++ .../util/concurrent/EsExecutorsTests.java | 38 +++++++++++++++++++ .../concurrent/ExecutorScalingQueueTests.java | 35 +++++++++++++++++ 3 files changed, 90 insertions(+) create mode 100644 server/src/test/java/org/elasticsearch/common/util/concurrent/ExecutorScalingQueueTests.java diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index d7774d5c0a7ea..c798d7f8036a8 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -329,6 +329,23 @@ public boolean offer(E e) { } } + // The following three methods, put, add, and timed offer, are + // overridden to workaround a JDK bug introduced in JDK 21.0.2 + // https://bugs.openjdk.org/browse/JDK-8323659 + @Override + public void put(E e) { + super.offer(e); + } + + @Override + public boolean add(E e) { + return super.offer(e); + } + + @Override + public boolean offer(E e, long timeout, TimeUnit unit) { + return super.offer(e); + } } /** diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java index fb9bde31e8fc4..8de57f7dc1040 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.Processors; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.hamcrest.Matcher; import java.util.Locale; @@ -501,4 +502,41 @@ public void testNodeProcessorsFloatValidation() { } } + // This test must complete to ensure that our basic infrastructure is working as expected. + // Specifically that ExecutorScalingQueue, which subclasses LinkedTransferQueue, correctly + // tracks tasks submitted to the executor. + public void testLTQIssue() { + final var executorService = EsExecutors.newScaling( + "test", + 0, + 1, + 60, + TimeUnit.SECONDS, + true, + EsExecutors.daemonThreadFactory("test"), + new ThreadContext(Settings.EMPTY) + ); + try { + final var countDownLatch = new CountDownLatch(10); + class TestTask extends AbstractRunnable { + @Override + protected void doRun() { + countDownLatch.countDown(); + if (countDownLatch.getCount() > 0) { + executorService.execute(TestTask.this); + } + } + + @Override + public void onFailure(Exception e) { + fail(e); + } + } + + executorService.execute(new TestTask()); + safeAwait(countDownLatch); + } finally { + ThreadPool.terminate(executorService, 10, TimeUnit.SECONDS); + } + } } diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/ExecutorScalingQueueTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/ExecutorScalingQueueTests.java new file mode 100644 index 0000000000000..8c2f56f879da3 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/ExecutorScalingQueueTests.java @@ -0,0 +1,35 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.common.util.concurrent; + +import org.elasticsearch.test.ESTestCase; + +import java.util.concurrent.TimeUnit; + +public class ExecutorScalingQueueTests extends ESTestCase { + + public void testPut() { + var queue = new EsExecutors.ExecutorScalingQueue<>(); + queue.put(new Object()); + assertEquals(queue.size(), 1); + } + + public void testAdd() { + var queue = new EsExecutors.ExecutorScalingQueue<>(); + queue.add(new Object()); + assertEquals(queue.size(), 1); + } + + public void testTimedOffer() { + var queue = new EsExecutors.ExecutorScalingQueue<>(); + queue.offer(new Object(), 60, TimeUnit.SECONDS); + assertEquals(queue.size(), 1); + } + +} From ac96b957b8bc94df3708abec05e470f3439f2354 Mon Sep 17 00:00:00 2001 From: ChrisHegarty Date: Sat, 13 Jan 2024 16:31:48 +0000 Subject: [PATCH 2/7] asserts --- .../common/util/concurrent/EsExecutors.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index c798d7f8036a8..7ea9a8b3fcb78 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -334,17 +334,22 @@ public boolean offer(E e) { // https://bugs.openjdk.org/browse/JDK-8323659 @Override public void put(E e) { - super.offer(e); + boolean added = super.offer(e); + assert added; } @Override public boolean add(E e) { - return super.offer(e); + boolean added = super.offer(e); + assert added; + return added; } @Override public boolean offer(E e, long timeout, TimeUnit unit) { - return super.offer(e); + boolean added = super.offer(e); + assert added; + return added; } } From 0b874722fc52f8a72f5541bf64544977d6648351 Mon Sep 17 00:00:00 2001 From: Chris Hegarty <62058229+ChrisHegarty@users.noreply.github.com> Date: Tue, 16 Jan 2024 09:50:09 +0000 Subject: [PATCH 3/7] Update server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java Co-authored-by: David Turner --- .../elasticsearch/common/util/concurrent/EsExecutorsTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java index 8de57f7dc1040..a58b641c79730 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java @@ -517,7 +517,7 @@ public void testLTQIssue() { new ThreadContext(Settings.EMPTY) ); try { - final var countDownLatch = new CountDownLatch(10); + final var countDownLatch = new CountDownLatch(between(1, 10)); class TestTask extends AbstractRunnable { @Override protected void doRun() { From 6deae63dd29cabc0faa9b51669b1fd3fee33ee29 Mon Sep 17 00:00:00 2001 From: Chris Hegarty <62058229+ChrisHegarty@users.noreply.github.com> Date: Tue, 16 Jan 2024 09:50:20 +0000 Subject: [PATCH 4/7] Update server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java Co-authored-by: David Turner --- .../elasticsearch/common/util/concurrent/EsExecutorsTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java index a58b641c79730..e78c1d2f8508f 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java @@ -512,7 +512,7 @@ public void testLTQIssue() { 1, 60, TimeUnit.SECONDS, - true, + randomBoolean(), EsExecutors.daemonThreadFactory("test"), new ThreadContext(Settings.EMPTY) ); From 6139d175af125ac88b0c3f9ce849572a7b67b58c Mon Sep 17 00:00:00 2001 From: Chris Hegarty <62058229+ChrisHegarty@users.noreply.github.com> Date: Tue, 16 Jan 2024 09:50:30 +0000 Subject: [PATCH 5/7] Update server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java Co-authored-by: David Turner --- .../elasticsearch/common/util/concurrent/EsExecutorsTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java index e78c1d2f8508f..8ace2f58819cc 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java @@ -509,7 +509,7 @@ public void testLTQIssue() { final var executorService = EsExecutors.newScaling( "test", 0, - 1, + between(1,5), 60, TimeUnit.SECONDS, randomBoolean(), From 1f2ed138f9f1040068e4d301d8db51f0ae9145d4 Mon Sep 17 00:00:00 2001 From: ChrisHegarty Date: Tue, 16 Jan 2024 10:00:21 +0000 Subject: [PATCH 6/7] review comments --- .../common/util/concurrent/EsExecutors.java | 21 ++++++++++--------- .../util/concurrent/EsExecutorsTests.java | 2 +- .../concurrent/ExecutorScalingQueueTests.java | 4 ++-- 3 files changed, 14 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index 7ea9a8b3fcb78..e6865e5c66e74 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -329,27 +329,28 @@ public boolean offer(E e) { } } - // The following three methods, put, add, and timed offer, are - // overridden to workaround a JDK bug introduced in JDK 21.0.2 + // Overridden to workaround a JDK bug introduced in JDK 21.0.2 // https://bugs.openjdk.org/browse/JDK-8323659 @Override public void put(E e) { - boolean added = super.offer(e); - assert added; + // As the queue is unbounded, this method will always add to the queue. + super.offer(e); } + // Overridden to workaround a JDK bug introduced in JDK 21.0.2 + // https://bugs.openjdk.org/browse/JDK-8323659 @Override public boolean add(E e) { - boolean added = super.offer(e); - assert added; - return added; + // As the queue is unbounded, this method will never return false. + return super.offer(e); } + // Overridden to workaround a JDK bug introduced in JDK 21.0.2 + // https://bugs.openjdk.org/browse/JDK-8323659 @Override public boolean offer(E e, long timeout, TimeUnit unit) { - boolean added = super.offer(e); - assert added; - return added; + // As the queue is unbounded, this method will never return false. + return super.offer(e); } } diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java index 8ace2f58819cc..a03f5074517bd 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java @@ -505,7 +505,7 @@ public void testNodeProcessorsFloatValidation() { // This test must complete to ensure that our basic infrastructure is working as expected. // Specifically that ExecutorScalingQueue, which subclasses LinkedTransferQueue, correctly // tracks tasks submitted to the executor. - public void testLTQIssue() { + public void testBasicTaskExecution() { final var executorService = EsExecutors.newScaling( "test", 0, diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/ExecutorScalingQueueTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/ExecutorScalingQueueTests.java index 8c2f56f879da3..b1e1b9d620d2a 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/ExecutorScalingQueueTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/ExecutorScalingQueueTests.java @@ -22,13 +22,13 @@ public void testPut() { public void testAdd() { var queue = new EsExecutors.ExecutorScalingQueue<>(); - queue.add(new Object()); + assertTrue(queue.add(new Object())); assertEquals(queue.size(), 1); } public void testTimedOffer() { var queue = new EsExecutors.ExecutorScalingQueue<>(); - queue.offer(new Object(), 60, TimeUnit.SECONDS); + assertTrue(queue.offer(new Object(), 60, TimeUnit.SECONDS)); assertEquals(queue.size(), 1); } From 6487767210b5135538240afa7b63f097d93ede39 Mon Sep 17 00:00:00 2001 From: ChrisHegarty Date: Tue, 16 Jan 2024 10:21:43 +0000 Subject: [PATCH 7/7] spotless --- .../elasticsearch/common/util/concurrent/EsExecutorsTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java index a03f5074517bd..cb1dddd7c51f3 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java @@ -509,7 +509,7 @@ public void testBasicTaskExecution() { final var executorService = EsExecutors.newScaling( "test", 0, - between(1,5), + between(1, 5), 60, TimeUnit.SECONDS, randomBoolean(),