Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/elastic/elasticsearch into …
Browse files Browse the repository at this point in the history
…99170_fix
  • Loading branch information
astefan committed Sep 4, 2023
2 parents 7ab7dbe + c9a2555 commit 93b844d
Show file tree
Hide file tree
Showing 53 changed files with 202 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ A sample response:
----
{
"name": "my-app",
"indices": [ "index1", "index2" ],
"updated_at_millis": 1682105622204,
"template": {
"script": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexMode;
Expand Down Expand Up @@ -156,7 +157,7 @@ void scheduleTask() {
job = threadPool.scheduleWithFixedDelay(
() -> perform(() -> LOGGER.debug("completed tsdb update task")),
pollInterval,
ThreadPool.Names.SAME
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1016,6 +1016,7 @@ public void testDownsampling() throws Exception {
Settings.builder()
.put(firstGenMetadata.getSettings())
.put(IndexMetadata.INDEX_DOWNSAMPLE_SOURCE_NAME_KEY, firstGenIndexName)
.put(IndexMetadata.INDEX_DOWNSAMPLE_ORIGIN_NAME_KEY, firstGenIndexName)
.put(IndexMetadata.INDEX_DOWNSAMPLE_STATUS.getKey(), SUCCESS)
)
.numberOfReplicas(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,11 @@
@SuppressForbidden(reason = "It wraps a ThreadPool and delegates all the work")
public class ReactorScheduledExecutorService extends AbstractExecutorService implements ScheduledExecutorService {
private final ThreadPool threadPool;
private final String executorName;
private final ExecutorService delegate;
private final Logger logger = LogManager.getLogger(ReactorScheduledExecutorService.class);

public ReactorScheduledExecutorService(ThreadPool threadPool, String executorName) {
this.threadPool = threadPool;
this.executorName = executorName;
this.delegate = threadPool.executor(executorName);
}

Expand All @@ -54,14 +52,14 @@ public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUni
} catch (Exception e) {
throw new RuntimeException(e);
}
}, new TimeValue(delay, unit), executorName);
}, new TimeValue(delay, unit), delegate);

return new ReactorFuture<>(schedule);
}

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
Runnable decoratedCommand = decorateRunnable(command);
Scheduler.ScheduledCancellable schedule = threadPool.schedule(decoratedCommand, new TimeValue(delay, unit), executorName);
Scheduler.ScheduledCancellable schedule = threadPool.schedule(decoratedCommand, new TimeValue(delay, unit), delegate);
return new ReactorFuture<>(schedule);
}

Expand All @@ -75,11 +73,7 @@ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDela
} catch (EsRejectedExecutionException e) {
if (e.isExecutorShutdown()) {
logger.debug(
() -> format(
"could not schedule execution of [%s] on [%s] as executor is shut down",
decoratedCommand,
executorName
),
() -> format("could not schedule execution of [%s] on [%s] as executor is shut down", decoratedCommand, delegate),
e
);
} else {
Expand All @@ -93,7 +87,7 @@ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDela
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
Runnable decorateRunnable = decorateRunnable(command);

Scheduler.Cancellable cancellable = threadPool.scheduleWithFixedDelay(decorateRunnable, new TimeValue(delay, unit), executorName);
Scheduler.Cancellable cancellable = threadPool.scheduleWithFixedDelay(decorateRunnable, new TimeValue(delay, unit), delegate);

return new ReactorFuture<>(cancellable);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
Expand Down Expand Up @@ -111,7 +112,7 @@ public Collection<Object> createComponents(
if (rc < 0) {
logger.warn("extending startup timeout via sd_notify failed with [{}]", rc);
}
}, TimeValue.timeValueSeconds(15), ThreadPool.Names.SAME));
}, TimeValue.timeValueSeconds(15), EsExecutors.DIRECT_EXECUTOR_SERVICE));
return List.of();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.systemd;

import org.elasticsearch.Build;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.test.ESTestCase;
Expand All @@ -30,6 +31,7 @@
import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -47,8 +49,13 @@ public class SystemdPluginTests extends ESTestCase {

{
when(extender.cancel()).thenReturn(true);
when(threadPool.scheduleWithFixedDelay(any(Runnable.class), eq(TimeValue.timeValueSeconds(15)), eq(ThreadPool.Names.SAME)))
.thenReturn(extender);
when(
threadPool.scheduleWithFixedDelay(
any(Runnable.class),
eq(TimeValue.timeValueSeconds(15)),
same(EsExecutors.DIRECT_EXECUTOR_SERVICE)
)
).thenReturn(extender);
}

public void testIsEnabled() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public static void cleanupFiles() {
FileUtils.rm(instancesFile, certificatesFile);
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/99153")
public void test10Install() throws Exception {
install();
// Disable security auto-configuration as we want to generate keys/certificates manually here
Expand Down
3 changes: 2 additions & 1 deletion server/src/main/java/org/elasticsearch/TransportVersion.java
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ private static TransportVersion registerTransportVersion(int id, String uniqueId
public static final TransportVersion V_8_500_066 = registerTransportVersion(8_500_066, "F398ECC6-5D2A-4BD8-A9E8-1101F030DF85");
public static final TransportVersion V_8_500_067 = registerTransportVersion(8_500_067, "a7c86604-a917-4aff-9a1b-a4d44c3dbe02");
public static final TransportVersion V_8_500_068 = registerTransportVersion(8_500_068, "2683c8b4-5372-4a6a-bb3a-d61aa679089a");
public static final TransportVersion V_8_500_069 = registerTransportVersion(8_500_069, "5b804027-d8a0-421b-9970-1f53d766854b");

/*
* STOP! READ THIS FIRST! No, really,
Expand All @@ -205,7 +206,7 @@ private static TransportVersion registerTransportVersion(int id, String uniqueId
*/

private static class CurrentHolder {
private static final TransportVersion CURRENT = findCurrent(V_8_500_068);
private static final TransportVersion CURRENT = findCurrent(V_8_500_069);

// finds the pluggable current version, or uses the given fallback
private static TransportVersion findCurrent(TransportVersion fallback) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ void getRunningTaskFromNode(Task thisTask, GetTaskRequest request, ActionListene
future.addTimeout(
requireNonNullElse(request.getTimeout(), DEFAULT_WAIT_FOR_COMPLETION_TIMEOUT),
threadPool,
ThreadPool.Names.SAME
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
} else {
TaskInfo info = runningTask.taskInfo(clusterService.localNode().getId(), true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.TimeValue;
Expand Down Expand Up @@ -142,7 +143,7 @@ protected void processTasks(CancellableTask nodeTask, ListTasksRequest request,
future.addTimeout(
requireNonNullElse(request.getTimeout(), DEFAULT_WAIT_FOR_COMPLETION_TIMEOUT),
threadPool,
ThreadPool.Names.SAME
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
nodeTask.addListener(() -> future.onFailure(new TaskCancelledException("task cancelled")));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.threadpool.ScheduledExecutorServiceScheduler;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.XContentType;

import java.io.Closeable;
Expand Down Expand Up @@ -464,7 +464,7 @@ public boolean isCancelled() {
}
};
}
return scheduler.scheduleWithFixedDelay(new Flush(), flushInterval, ThreadPool.Names.GENERIC);
return scheduler.scheduleWithFixedDelay(new Flush(), flushInterval, EsExecutors.DIRECT_EXECUTOR_SERVICE);
}

// needs to be executed under a lock
Expand Down
4 changes: 2 additions & 2 deletions server/src/main/java/org/elasticsearch/action/bulk/Retry.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.Iterator;
Expand Down Expand Up @@ -136,7 +136,7 @@ private void retry(BulkRequest bulkRequestForRetry) {
assert backoff.hasNext();
TimeValue next = backoff.next();
logger.trace("Retry of bulk request scheduled in {} ms.", next.millis());
retryCancellable = scheduler.schedule(() -> this.execute(bulkRequestForRetry), next, ThreadPool.Names.SAME);
retryCancellable = scheduler.schedule(() -> this.execute(bulkRequestForRetry), next, EsExecutors.DIRECT_EXECUTOR_SERVICE);
}

private BulkRequest createBulkRequestForRetry(BulkResponse bulkItemResponses) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ public static Rounding.Prepared createRounding(final String expr, final String t
* prefix-fixedInterval-baseIndexName
*
* Note that this looks for the base index name of the provided index metadata via the
* {@link IndexMetadata#INDEX_DOWNSAMPLE_SOURCE_NAME_KEY} setting. This means that in case
* {@link IndexMetadata#INDEX_DOWNSAMPLE_ORIGIN_NAME_KEY} setting. This means that in case
* the provided index was already downsampled, we'll use the original source index (of the
* current provided downsample index) as the base index name.
*/
Expand All @@ -251,10 +251,13 @@ public static String generateDownsampleIndexName(
IndexMetadata sourceIndexMetadata,
DateHistogramInterval fixedInterval
) {
String downsampleSourceName = sourceIndexMetadata.getSettings().get(IndexMetadata.INDEX_DOWNSAMPLE_SOURCE_NAME_KEY);
String downsampleOriginName = IndexMetadata.INDEX_DOWNSAMPLE_ORIGIN_NAME.get(sourceIndexMetadata.getSettings());
String sourceIndexName;
if (downsampleSourceName != null) {
sourceIndexName = downsampleSourceName;
if (Strings.hasText(downsampleOriginName)) {
sourceIndexName = downsampleOriginName;
} else if (Strings.hasText(IndexMetadata.INDEX_DOWNSAMPLE_SOURCE_NAME.get(sourceIndexMetadata.getSettings()))) {
// bwc for downsample indices created pre 8.10 which didn't configure the origin
sourceIndexName = IndexMetadata.INDEX_DOWNSAMPLE_SOURCE_NAME.get(sourceIndexMetadata.getSettings());
} else {
sourceIndexName = sourceIndexMetadata.getIndex().getName();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,14 +305,14 @@ public void complete(ActionListener<?> listener) {
* work. For instance, it could check that the race is not lost by calling {@link #isDone} whenever appropriate, or it could subscribe
* another listener which performs any necessary cleanup steps.
*/
public void addTimeout(TimeValue timeout, ThreadPool threadPool, String timeoutExecutor) {
public void addTimeout(TimeValue timeout, ThreadPool threadPool, Executor timeoutExecutor) {
if (isDone()) {
return;
}
addListener(ActionListener.running(scheduleTimeout(timeout, threadPool, timeoutExecutor)));
}

private Runnable scheduleTimeout(TimeValue timeout, ThreadPool threadPool, String timeoutExecutor) {
private Runnable scheduleTimeout(TimeValue timeout, ThreadPool threadPool, Executor timeoutExecutor) {
try {
final var cancellable = threadPool.schedule(
() -> onFailure(new ElasticsearchTimeoutException(Strings.format("timed out after [%s/%dms]", timeout, timeout.millis()))),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.node.Node;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
Expand Down Expand Up @@ -130,7 +130,11 @@ void logBootstrapState(Metadata metadata) {
logger.info("this node is locked into cluster UUID [{}] and will not attempt further cluster bootstrapping", clusterUUID);
} else {
transportService.getThreadPool()
.scheduleWithFixedDelay(() -> logRemovalWarning(clusterUUID), TimeValue.timeValueHours(12), Names.SAME);
.scheduleWithFixedDelay(
() -> logRemovalWarning(clusterUUID),
TimeValue.timeValueHours(12),
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
logRemovalWarning(clusterUUID);
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1227,6 +1227,8 @@ public Index getResizeSourceIndex() {

public static final String INDEX_DOWNSAMPLE_SOURCE_UUID_KEY = "index.downsample.source.uuid";
public static final String INDEX_DOWNSAMPLE_SOURCE_NAME_KEY = "index.downsample.source.name";
public static final String INDEX_DOWNSAMPLE_ORIGIN_NAME_KEY = "index.downsample.origin.name";
public static final String INDEX_DOWNSAMPLE_ORIGIN_UUID_KEY = "index.downsample.origin.uuid";

public static final String INDEX_DOWNSAMPLE_STATUS_KEY = "index.downsample.status";
public static final Setting<String> INDEX_DOWNSAMPLE_SOURCE_UUID = Setting.simpleString(
Expand All @@ -1240,6 +1242,18 @@ public Index getResizeSourceIndex() {
Property.PrivateIndex
);

public static final Setting<String> INDEX_DOWNSAMPLE_ORIGIN_NAME = Setting.simpleString(
INDEX_DOWNSAMPLE_ORIGIN_NAME_KEY,
Property.IndexScope,
Property.PrivateIndex
);

public static final Setting<String> INDEX_DOWNSAMPLE_ORIGIN_UUID = Setting.simpleString(
INDEX_DOWNSAMPLE_ORIGIN_UUID_KEY,
Property.IndexScope,
Property.PrivateIndex
);

public enum DownsampleTaskStatus {
UNKNOWN,
STARTED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexMetadata.INDEX_FORMAT_SETTING,
IndexMetadata.INDEX_DOWNSAMPLE_SOURCE_NAME,
IndexMetadata.INDEX_DOWNSAMPLE_SOURCE_UUID,
IndexMetadata.INDEX_DOWNSAMPLE_ORIGIN_NAME,
IndexMetadata.INDEX_DOWNSAMPLE_ORIGIN_UUID,
IndexMetadata.INDEX_DOWNSAMPLE_STATUS,
SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_DEBUG_SETTING,
SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_WARN_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
Expand All @@ -25,7 +26,6 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.Scheduler.Cancellable;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;

import java.io.Closeable;
import java.util.ArrayList;
Expand Down Expand Up @@ -147,7 +147,7 @@ public class IndexingMemoryController implements IndexingOperationListener, Clos

protected Cancellable scheduleTask(ThreadPool threadPool) {
// it's fine to run it on the scheduler thread, no busy work
return threadPool.scheduleWithFixedDelay(statusChecker, interval, Names.SAME);
return threadPool.scheduleWithFixedDelay(statusChecker, interval, EsExecutors.DIRECT_EXECUTOR_SERVICE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public FsHealthService(Settings settings, ClusterSettings clusterSettings, Threa

@Override
protected void doStart() {
scheduledFuture = threadPool.scheduleWithFixedDelay(new FsHealthMonitor(), refreshInterval, ThreadPool.Names.GENERIC);
scheduledFuture = threadPool.scheduleWithFixedDelay(new FsHealthMonitor(), refreshInterval, threadPool.generic());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.monitor.jvm.JvmStats.GarbageCollector;
import org.elasticsearch.threadpool.Scheduler.Cancellable;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;

import java.util.HashMap;
import java.util.Locale;
Expand Down Expand Up @@ -224,7 +224,7 @@ void onSlowGc(final Threshold threshold, final long seq, final SlowGcEvent slowG
void onGcOverhead(final Threshold threshold, final long current, final long elapsed, final long seq) {
logGcOverhead(logger, threshold, current, elapsed, seq);
}
}, interval, Names.SAME);
}, interval, EsExecutors.DIRECT_EXECUTOR_SERVICE);
}

private static final String SLOW_GC_LOG_MESSAGE =
Expand Down
Loading

0 comments on commit 93b844d

Please sign in to comment.