From 0c0e1797291808130e9c866b097fdcacb5b68a14 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Wed, 27 Sep 2023 10:38:22 +0200 Subject: [PATCH] [Transform] let _stats internally timeout if checkpoint information can not be retrieved (#99914) getting the stats of a transform can timeout, this change lets the retrieval of checkpointing info timeout if it takes too long. As stats are usually requested for all transforms, this prevents a failure if just 1 transform does not respond. --- docs/changelog/99914.yaml | 5 +++ .../TransportGetTransformStatsAction.java | 10 +++++- .../transform/transforms/TransformTask.java | 33 +++++++++++++------ 3 files changed, 37 insertions(+), 11 deletions(-) create mode 100644 docs/changelog/99914.yaml diff --git a/docs/changelog/99914.yaml b/docs/changelog/99914.yaml new file mode 100644 index 0000000000000..8b0026a8ff9ca --- /dev/null +++ b/docs/changelog/99914.yaml @@ -0,0 +1,5 @@ +pr: 99914 +summary: Let `_stats` internally timeout if checkpoint information can not be retrieved +area: Transform +type: bug +issues: [] diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java index 07b753d8aea77..b62f0c95785b2 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.persistent.PersistentTasksCustomMetadata.Assignment; @@ -63,6 +64,9 @@ public class TransportGetTransformStatsAction extends TransportTasksAction listener + ActionListener listener, + TimeValue timeout ) { - ActionListener checkPointInfoListener = ActionListener.wrap(infoBuilder -> { - if (context.getChangesLastDetectedAt() != null) { - infoBuilder.setChangesLastDetectedAt(context.getChangesLastDetectedAt()); - } - if (context.getLastSearchTime() != null) { - infoBuilder.setLastSearchTime(context.getLastSearchTime()); - } - listener.onResponse(infoBuilder.build()); - }, listener::onFailure); + ActionListener checkPointInfoListener = ListenerTimeouts.wrapWithTimeout( + threadPool, + timeout, + threadPool.generic(), + ActionListener.wrap(infoBuilder -> { + if (context.getChangesLastDetectedAt() != null) { + infoBuilder.setChangesLastDetectedAt(context.getChangesLastDetectedAt()); + } + if (context.getLastSearchTime() != null) { + infoBuilder.setLastSearchTime(context.getLastSearchTime()); + } + listener.onResponse(infoBuilder.build()); + }, listener::onFailure), + (ignore) -> listener.onFailure( + new ElasticsearchTimeoutException(format("Timed out retrieving checkpointing info after [%s]", timeout)) + ) + ); + // TODO: pass `timeout` to the lower layers ClientTransformIndexer transformIndexer = getIndexer(); if (transformIndexer == null) { transformsCheckpointService.getCheckpointingInfo(