Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Wait for job updates in AutoDetectResultProcessor.awaitCompletion #36856

Merged
merged 3 commits into from
Dec 20, 2018
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -432,18 +432,18 @@ private void updateEstablishedModelMemoryOnJob() {
// We need to make all results written up to and including these stats available for the established memory calculation
persister.commitResultWrites(jobId);

try {
jobUpdateSemaphore.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.info("[{}] Interrupted acquiring update established model memory semaphore", jobId);
return;
}

jobResultsProvider.getEstablishedMemoryUsage(jobId, latestBucketTimestamp, modelSizeStatsForCalc, establishedModelMemory -> {
if (latestEstablishedModelMemory != establishedModelMemory) {

client.threadPool().executor(MachineLearning.UTILITY_THREAD_POOL_NAME).submit(() -> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can submit() throw? I'm not sure if it is necessary to wrap this in a try...catch and release the semaphore in the catch

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it can throw rejected execution exception if the thread pool’s queue is full or it is shutting down.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, it can use execute() rather than submit() as it's not interested in getting a Future to track completion. Since you're off I'll push another commit that changes both things.

try {
jobUpdateSemaphore.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.info("[{}] Interrupted acquiring update established model memory semaphore", jobId);
return;
}

JobUpdate update = new JobUpdate.Builder(jobId).setEstablishedModelMemory(establishedModelMemory).build();
UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(jobId, update);
updateRequest.setWaitForAck(false);
Expand All @@ -465,8 +465,13 @@ public void onFailure(Exception e) {
}
});
});
} else {
jobUpdateSemaphore.release();
}
}, e -> LOGGER.error("[" + jobId + "] Failed to calculate established model memory", e));
}, e -> {
jobUpdateSemaphore.release();
LOGGER.error("[" + jobId + "] Failed to calculate established model memory", e);
});
}

public void awaitCompletion() throws TimeoutException {
Expand Down