Skip to content

Commit

Permalink
Update ExecutorScalingQueue to workaround LinkedTransferQueue JDK bug (
Browse files Browse the repository at this point in the history
…#104347) (#104426)

This commit adds a few overrides to ExecutorScalingQueue (subclass of LinkedTransferQueue) to workaround a JDK bug in LinkedTransferQueue.
  • Loading branch information
ChrisHegarty committed Jan 17, 2024
1 parent 96d83cd commit f2ef03a
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,29 @@ public boolean offer(E e) {
}
}

// Overridden to workaround a JDK bug introduced in JDK 21.0.2
// https://bugs.openjdk.org/browse/JDK-8323659
@Override
public void put(E e) {
// As the queue is unbounded, this method will always add to the queue.
super.offer(e);
}

// Overridden to workaround a JDK bug introduced in JDK 21.0.2
// https://bugs.openjdk.org/browse/JDK-8323659
@Override
public boolean add(E e) {
// As the queue is unbounded, this method will never return false.
return super.offer(e);
}

// Overridden to workaround a JDK bug introduced in JDK 21.0.2
// https://bugs.openjdk.org/browse/JDK-8323659
@Override
public boolean offer(E e, long timeout, TimeUnit unit) {
// As the queue is unbounded, this method will never return false.
return super.offer(e);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.Processors;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.hamcrest.Matcher;

import java.util.Locale;
Expand Down Expand Up @@ -501,4 +502,41 @@ public void testNodeProcessorsFloatValidation() {
}
}

// This test must complete to ensure that our basic infrastructure is working as expected.
// Specifically that ExecutorScalingQueue, which subclasses LinkedTransferQueue, correctly
// tracks tasks submitted to the executor.
public void testBasicTaskExecution() {
final var executorService = EsExecutors.newScaling(
"test",
0,
between(1, 5),
60,
TimeUnit.SECONDS,
randomBoolean(),
EsExecutors.daemonThreadFactory("test"),
new ThreadContext(Settings.EMPTY)
);
try {
final var countDownLatch = new CountDownLatch(between(1, 10));
class TestTask extends AbstractRunnable {
@Override
protected void doRun() {
countDownLatch.countDown();
if (countDownLatch.getCount() > 0) {
executorService.execute(TestTask.this);
}
}

@Override
public void onFailure(Exception e) {
fail(e);
}
}

executorService.execute(new TestTask());
safeAwait(countDownLatch);
} finally {
ThreadPool.terminate(executorService, 10, TimeUnit.SECONDS);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.common.util.concurrent;

import org.elasticsearch.test.ESTestCase;

import java.util.concurrent.TimeUnit;

public class ExecutorScalingQueueTests extends ESTestCase {

public void testPut() {
var queue = new EsExecutors.ExecutorScalingQueue<>();
queue.put(new Object());
assertEquals(queue.size(), 1);
}

public void testAdd() {
var queue = new EsExecutors.ExecutorScalingQueue<>();
assertTrue(queue.add(new Object()));
assertEquals(queue.size(), 1);
}

public void testTimedOffer() {
var queue = new EsExecutors.ExecutorScalingQueue<>();
assertTrue(queue.offer(new Object(), 60, TimeUnit.SECONDS));
assertEquals(queue.size(), 1);
}

}

0 comments on commit f2ef03a

Please sign in to comment.