From ff6eaa3f1cc1fd6e59190cefd381339ed8d8bdf1 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Wed, 19 Dec 2018 17:48:08 +0000 Subject: [PATCH 1/2] Fix race updating the job on close --- .../output/AutoDetectResultProcessor.java | 25 +++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java index 28d6d76eb8395..96e825c02c109 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -432,18 +432,18 @@ private void updateEstablishedModelMemoryOnJob() { // We need to make all results written up to and including these stats available for the established memory calculation persister.commitResultWrites(jobId); + try { + jobUpdateSemaphore.acquire(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.info("[{}] Interrupted acquiring update established model memory semaphore", jobId); + return; + } + jobResultsProvider.getEstablishedMemoryUsage(jobId, latestBucketTimestamp, modelSizeStatsForCalc, establishedModelMemory -> { if (latestEstablishedModelMemory != establishedModelMemory) { - + client.threadPool().executor(MachineLearning.UTILITY_THREAD_POOL_NAME).submit(() -> { - try { - jobUpdateSemaphore.acquire(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOGGER.info("[{}] Interrupted acquiring update established model memory semaphore", jobId); - return; - } - JobUpdate update = new JobUpdate.Builder(jobId).setEstablishedModelMemory(establishedModelMemory).build(); UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(jobId, update); updateRequest.setWaitForAck(false); @@ -465,8 +465,13 @@ public void onFailure(Exception e) { } }); }); + } else { + jobUpdateSemaphore.release(); } - }, e -> LOGGER.error("[" + jobId + "] Failed to calculate established model memory", e)); + }, e -> { + jobUpdateSemaphore.release(); + LOGGER.error("[" + jobId + "] Failed to calculate established model memory", e); + }); } public void awaitCompletion() throws TimeoutException { From c45ca9bfe8a20a0d69ee69683b1099ce3bf7b85f Mon Sep 17 00:00:00 2001 From: David Roberts Date: Thu, 20 Dec 2018 13:35:08 +0000 Subject: [PATCH 2/2] Ensure semaphore is released following rejected execution Also use execute() rather than submit() when scheduling update --- .../output/AutoDetectResultProcessor.java | 51 ++++++++++--------- 1 file changed, 28 insertions(+), 23 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java index 96e825c02c109..ab45c7f6017c5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -442,29 +442,34 @@ private void updateEstablishedModelMemoryOnJob() { jobResultsProvider.getEstablishedMemoryUsage(jobId, latestBucketTimestamp, modelSizeStatsForCalc, establishedModelMemory -> { if (latestEstablishedModelMemory != establishedModelMemory) { - - client.threadPool().executor(MachineLearning.UTILITY_THREAD_POOL_NAME).submit(() -> { - JobUpdate update = new JobUpdate.Builder(jobId).setEstablishedModelMemory(establishedModelMemory).build(); - UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(jobId, update); - updateRequest.setWaitForAck(false); - - executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, updateRequest, - new ActionListener() { - @Override - public void onResponse(PutJobAction.Response response) { - jobUpdateSemaphore.release(); - latestEstablishedModelMemory = establishedModelMemory; - LOGGER.debug("[{}] Updated job with established model memory [{}]", jobId, establishedModelMemory); - } - - @Override - public void onFailure(Exception e) { - jobUpdateSemaphore.release(); - LOGGER.error("[" + jobId + "] Failed to update job with new established model memory [" + - establishedModelMemory + "]", e); - } - }); - }); + + try { + client.threadPool().executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> { + JobUpdate update = new JobUpdate.Builder(jobId).setEstablishedModelMemory(establishedModelMemory).build(); + UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(jobId, update); + updateRequest.setWaitForAck(false); + + executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, updateRequest, + new ActionListener() { + @Override + public void onResponse(PutJobAction.Response response) { + jobUpdateSemaphore.release(); + latestEstablishedModelMemory = establishedModelMemory; + LOGGER.debug("[{}] Updated job with established model memory [{}]", jobId, establishedModelMemory); + } + + @Override + public void onFailure(Exception e) { + jobUpdateSemaphore.release(); + LOGGER.error("[" + jobId + "] Failed to update job with new established model memory [" + + establishedModelMemory + "]", e); + } + }); + }); + } catch (Exception e) { + jobUpdateSemaphore.release(); + LOGGER.error("[" + jobId + "] error submitting established model memory update action", e); + } } else { jobUpdateSemaphore.release(); }