Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update ExecutorScalingQueue to workaround LinkedTransferQueue JDK bug #104347

Merged
merged 11 commits into from
Jan 16, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,28 @@ public boolean offer(E e) {
}
}

// The following three methods, put, add, and timed offer, are
// overridden to workaround a JDK bug introduced in JDK 21.0.2
// https://bugs.openjdk.org/browse/JDK-8323659
ChrisHegarty marked this conversation as resolved.
Show resolved Hide resolved
@Override
public void put(E e) {
boolean added = super.offer(e);
assert added;
Copy link
Contributor Author

@ChrisHegarty ChrisHegarty Jan 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works because LTQ is unbounded, offer will always succeed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 maybe add this as a code comment

}

@Override
public boolean add(E e) {
boolean added = super.offer(e);
assert added;
return added;
}

@Override
public boolean offer(E e, long timeout, TimeUnit unit) {
boolean added = super.offer(e);
assert added;
return added;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.Processors;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.hamcrest.Matcher;

import java.util.Locale;
Expand Down Expand Up @@ -501,4 +502,41 @@ public void testNodeProcessorsFloatValidation() {
}
}

// This test must complete to ensure that our basic infrastructure is working as expected.
// Specifically that ExecutorScalingQueue, which subclasses LinkedTransferQueue, correctly
// tracks tasks submitted to the executor.
public void testLTQIssue() {
ChrisHegarty marked this conversation as resolved.
Show resolved Hide resolved
final var executorService = EsExecutors.newScaling(
"test",
0,
1,
ChrisHegarty marked this conversation as resolved.
Show resolved Hide resolved
60,
TimeUnit.SECONDS,
true,
ChrisHegarty marked this conversation as resolved.
Show resolved Hide resolved
EsExecutors.daemonThreadFactory("test"),
new ThreadContext(Settings.EMPTY)
);
try {
final var countDownLatch = new CountDownLatch(10);
ChrisHegarty marked this conversation as resolved.
Show resolved Hide resolved
class TestTask extends AbstractRunnable {
@Override
protected void doRun() {
countDownLatch.countDown();
if (countDownLatch.getCount() > 0) {
executorService.execute(TestTask.this);
}
}

@Override
public void onFailure(Exception e) {
fail(e);
}
}

executorService.execute(new TestTask());
safeAwait(countDownLatch);
} finally {
ThreadPool.terminate(executorService, 10, TimeUnit.SECONDS);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.common.util.concurrent;

import org.elasticsearch.test.ESTestCase;

import java.util.concurrent.TimeUnit;

public class ExecutorScalingQueueTests extends ESTestCase {

public void testPut() {
var queue = new EsExecutors.ExecutorScalingQueue<>();
queue.put(new Object());
assertEquals(queue.size(), 1);
}

public void testAdd() {
var queue = new EsExecutors.ExecutorScalingQueue<>();
queue.add(new Object());
assertEquals(queue.size(), 1);
}

public void testTimedOffer() {
var queue = new EsExecutors.ExecutorScalingQueue<>();
queue.offer(new Object(), 60, TimeUnit.SECONDS);
assertEquals(queue.size(), 1);
}

}