diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index 9294aef87526d..6296bdf1277ff 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -564,7 +564,10 @@ private void finalizeCheckpoint(ActionListener listener) { @Override protected void afterFinishOrFailure() { - finishIndexerThreadShutdown(); + finishIndexerThreadShutdown(() -> { + auditor.info(transformConfig.getId(), "Transform has stopped."); + logger.info("[{}] transform has stopped.", transformConfig.getId()); + }); } @Override @@ -646,12 +649,6 @@ protected void onFailure(Exception exc) { } } - @Override - protected void onStop() { - auditor.info(transformConfig.getId(), "Transform has stopped."); - logger.info("[{}] transform has stopped.", transformConfig.getId()); - } - @Override protected void onAbort() { auditor.info(transformConfig.getId(), "Received abort request, stopping transform."); @@ -1203,7 +1200,7 @@ private void startIndexerThreadShutdown() { } } - private void finishIndexerThreadShutdown() { + private void finishIndexerThreadShutdown(Runnable next) { synchronized (context) { indexerThreadShuttingDown = false; if (saveStateRequestedDuringIndexerThreadShutdown) { @@ -1212,7 +1209,9 @@ private void finishIndexerThreadShutdown() { if (context.shouldStopAtCheckpoint() && nextCheckpoint == null) { stop(); } - doSaveState(getState(), getPosition(), () -> {}); + doSaveState(getState(), getPosition(), next); + } else { + next.run(); } } }