Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update ExecutorScalingQueue to workaround LinkedTransferQueue JDK bug #104347

Merged
merged 11 commits into from
Jan 16, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,29 @@ public boolean offer(E e) {
}
}

// Overridden to workaround a JDK bug introduced in JDK 21.0.2
// https://bugs.openjdk.org/browse/JDK-8323659
@Override
public void put(E e) {
// As the queue is unbounded, this method will always add to the queue.
super.offer(e);
}

// Overridden to workaround a JDK bug introduced in JDK 21.0.2
// https://bugs.openjdk.org/browse/JDK-8323659
@Override
public boolean add(E e) {
// As the queue is unbounded, this method will never return false.
return super.offer(e);
}

// Overridden to workaround a JDK bug introduced in JDK 21.0.2
// https://bugs.openjdk.org/browse/JDK-8323659
@Override
public boolean offer(E e, long timeout, TimeUnit unit) {
// As the queue is unbounded, this method will never return false.
return super.offer(e);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.Processors;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.hamcrest.Matcher;

import java.util.Locale;
Expand Down Expand Up @@ -501,4 +502,41 @@ public void testNodeProcessorsFloatValidation() {
}
}

// This test must complete to ensure that our basic infrastructure is working as expected.
// Specifically that ExecutorScalingQueue, which subclasses LinkedTransferQueue, correctly
// tracks tasks submitted to the executor.
public void testBasicTaskExecution() {
final var executorService = EsExecutors.newScaling(
"test",
0,
between(1,5),
60,
TimeUnit.SECONDS,
randomBoolean(),
EsExecutors.daemonThreadFactory("test"),
new ThreadContext(Settings.EMPTY)
);
try {
final var countDownLatch = new CountDownLatch(between(1, 10));
class TestTask extends AbstractRunnable {
@Override
protected void doRun() {
countDownLatch.countDown();
if (countDownLatch.getCount() > 0) {
executorService.execute(TestTask.this);
}
}

@Override
public void onFailure(Exception e) {
fail(e);
}
}

executorService.execute(new TestTask());
safeAwait(countDownLatch);
} finally {
ThreadPool.terminate(executorService, 10, TimeUnit.SECONDS);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.common.util.concurrent;

import org.elasticsearch.test.ESTestCase;

import java.util.concurrent.TimeUnit;

public class ExecutorScalingQueueTests extends ESTestCase {

public void testPut() {
var queue = new EsExecutors.ExecutorScalingQueue<>();
queue.put(new Object());
assertEquals(queue.size(), 1);
}

public void testAdd() {
var queue = new EsExecutors.ExecutorScalingQueue<>();
assertTrue(queue.add(new Object()));
assertEquals(queue.size(), 1);
}

public void testTimedOffer() {
var queue = new EsExecutors.ExecutorScalingQueue<>();
assertTrue(queue.offer(new Object(), 60, TimeUnit.SECONDS));
assertEquals(queue.size(), 1);
}

}