Skip to content

Commit

Permalink
[Transform] let _stats internally timeout if checkpoint information c…
Browse files Browse the repository at this point in the history
…an not be retrieved (elastic#99914)

getting the stats of a transform can timeout, this change lets the retrieval of checkpointing info timeout if it takes too
long. As stats are usually requested for all transforms, this prevents a failure if just 1 transform does not respond.
  • Loading branch information
Hendrik Muhs committed Sep 27, 2023
1 parent 1c2eeec commit 0c0e179
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 11 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/99914.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 99914
summary: Let `_stats` internally timeout if checkpoint information can not be retrieved
area: Transform
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata.Assignment;
Expand Down Expand Up @@ -63,6 +64,9 @@ public class TransportGetTransformStatsAction extends TransportTasksAction<Trans

private static final Logger logger = LogManager.getLogger(TransportGetTransformStatsAction.class);

// default timeout share to receive checkpoint info, with the default of 30s: 30s * 0.8 = 24s
private static final double CHECKPOINT_INFO_TIMEOUT_SHARE = 0.8;

private final TransformConfigManager transformConfigManager;
private final TransformCheckpointService transformCheckpointService;
private final Client client;
Expand Down Expand Up @@ -114,6 +118,7 @@ protected void taskOperation(CancellableTask actionTask, Request request, Transf
// Little extra insurance, make sure we only return transforms that aren't cancelled
ClusterState state = clusterService.state();
String nodeId = state.nodes().getLocalNode().getId();

if (task.isCancelled() == false) {
task.getCheckpointingInfo(
transformCheckpointService,
Expand All @@ -132,7 +137,10 @@ protected void taskOperation(CancellableTask actionTask, Request request, Transf
)
);
}
)
),
// at this point the transport already spend some time budget in `doExecute`, it is hard to tell what is left:
// recording the time spend would be complex and crosses machine boundaries, that's why we use a heuristic here
TimeValue.timeValueMillis((long) (request.getTimeout().millis() * CHECKPOINT_INFO_TIMEOUT_SHARE))
);
} else {
listener.onResponse(new Response(Collections.emptyList(), 0L));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ListenerTimeouts;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.ParentTaskAssigningClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask;
Expand Down Expand Up @@ -179,18 +182,28 @@ public TransformIndexerStats getStats() {

public void getCheckpointingInfo(
TransformCheckpointService transformsCheckpointService,
ActionListener<TransformCheckpointingInfo> listener
ActionListener<TransformCheckpointingInfo> listener,
TimeValue timeout
) {
ActionListener<TransformCheckpointingInfoBuilder> checkPointInfoListener = ActionListener.wrap(infoBuilder -> {
if (context.getChangesLastDetectedAt() != null) {
infoBuilder.setChangesLastDetectedAt(context.getChangesLastDetectedAt());
}
if (context.getLastSearchTime() != null) {
infoBuilder.setLastSearchTime(context.getLastSearchTime());
}
listener.onResponse(infoBuilder.build());
}, listener::onFailure);
ActionListener<TransformCheckpointingInfoBuilder> checkPointInfoListener = ListenerTimeouts.wrapWithTimeout(
threadPool,
timeout,
threadPool.generic(),
ActionListener.wrap(infoBuilder -> {
if (context.getChangesLastDetectedAt() != null) {
infoBuilder.setChangesLastDetectedAt(context.getChangesLastDetectedAt());
}
if (context.getLastSearchTime() != null) {
infoBuilder.setLastSearchTime(context.getLastSearchTime());
}
listener.onResponse(infoBuilder.build());
}, listener::onFailure),
(ignore) -> listener.onFailure(
new ElasticsearchTimeoutException(format("Timed out retrieving checkpointing info after [%s]", timeout))
)
);

// TODO: pass `timeout` to the lower layers
ClientTransformIndexer transformIndexer = getIndexer();
if (transformIndexer == null) {
transformsCheckpointService.getCheckpointingInfo(
Expand Down

0 comments on commit 0c0e179

Please sign in to comment.