Skip to content

Commit

Permalink
move auditor and logging for transform being stopped, so it gets repo…
Browse files Browse the repository at this point in the history
…rted after

the p-task has been marked for completion
  • Loading branch information
Hendrik Muhs committed Sep 26, 2023
1 parent 1a48c59 commit 8b865c1
Showing 1 changed file with 8 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,10 @@ private void finalizeCheckpoint(ActionListener<Void> listener) {

@Override
protected void afterFinishOrFailure() {
finishIndexerThreadShutdown();
finishIndexerThreadShutdown(() -> {
auditor.info(transformConfig.getId(), "Transform has stopped.");
logger.info("[{}] transform has stopped.", transformConfig.getId());
});
}

@Override
Expand Down Expand Up @@ -646,12 +649,6 @@ protected void onFailure(Exception exc) {
}
}

@Override
protected void onStop() {
auditor.info(transformConfig.getId(), "Transform has stopped.");
logger.info("[{}] transform has stopped.", transformConfig.getId());
}

@Override
protected void onAbort() {
auditor.info(transformConfig.getId(), "Received abort request, stopping transform.");
Expand Down Expand Up @@ -1203,7 +1200,7 @@ private void startIndexerThreadShutdown() {
}
}

private void finishIndexerThreadShutdown() {
private void finishIndexerThreadShutdown(Runnable next) {
synchronized (context) {
indexerThreadShuttingDown = false;
if (saveStateRequestedDuringIndexerThreadShutdown) {
Expand All @@ -1212,7 +1209,9 @@ private void finishIndexerThreadShutdown() {
if (context.shouldStopAtCheckpoint() && nextCheckpoint == null) {
stop();
}
doSaveState(getState(), getPosition(), () -> {});
doSaveState(getState(), getPosition(), next);
} else {
next.run();
}
}
}
Expand Down

0 comments on commit 8b865c1

Please sign in to comment.