diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java index d8740be875d62..7451208636897 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java @@ -111,8 +111,7 @@ public void clusterChanged(ClusterChangedEvent event) { // if this is not a data node, we need to start it ourselves possibly if (event.state().nodes().getLocalNode().isDataNode() == false && isWatcherStoppedManually == false && this.state.get() == WatcherState.STOPPED) { - watcherService.start(event.state()); - this.state.set(WatcherState.STARTED); + watcherService.start(event.state(), () -> this.state.set(WatcherState.STARTED)); return; } @@ -160,8 +159,8 @@ public void clusterChanged(ClusterChangedEvent event) { if (state.get() == WatcherState.STARTED) { watcherService.reload(event.state(), "new local watcher shard allocation ids"); } else if (state.get() == WatcherState.STOPPED) { - watcherService.start(event.state()); - this.state.set(WatcherState.STARTED); + this.state.set(WatcherState.STARTING); + watcherService.start(event.state(), () -> this.state.set(WatcherState.STARTED)); } } else { clearAllocationIds(); @@ -172,8 +171,8 @@ public void clusterChanged(ClusterChangedEvent event) { WatcherState watcherState = this.state.get(); if (event.localNodeMaster()) { if (watcherState != WatcherState.STARTED && watcherState != WatcherState.STARTING) { - watcherService.start(event.state()); - this.state.set(WatcherState.STARTED); + this.state.set(WatcherState.STARTING); + watcherService.start(event.state(), () -> this.state.set(WatcherState.STARTED)); } } else { if (watcherState == WatcherState.STARTED || watcherState == WatcherState.STARTING) { diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java index dcfb713a66580..49915674fe9e2 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java @@ -183,23 +183,40 @@ void reload(ClusterState state, String reason) { // by checking the cluster state version before and after loading the watches we can potentially just exit without applying the // changes processedClusterStateVersion.set(state.getVersion()); - pauseExecution(reason); triggerService.pauseExecution(); + int cancelledTaskCount = executionService.clearExecutionsAndQueue(); + logger.info("reloading watcher, reason [{}], cancelled [{}] queued tasks", reason, cancelledTaskCount); executor.execute(wrapWatcherService(() -> reloadInner(state, reason, false), e -> logger.error("error reloading watcher", e))); } - public void start(ClusterState state) { + /** + * start the watcher service, load watches in the background + * + * @param state the current cluster state + * @param postWatchesLoadedCallback the callback to be triggered, when watches where loaded successfully + */ + public void start(ClusterState state, Runnable postWatchesLoadedCallback) { + executionService.unPause(); processedClusterStateVersion.set(state.getVersion()); - executor.execute(wrapWatcherService(() -> reloadInner(state, "starting", true), + executor.execute(wrapWatcherService(() -> { + if (reloadInner(state, "starting", true)) { + postWatchesLoadedCallback.run(); + } + }, e -> logger.error("error starting watcher", e))); } /** - * reload the watches and start scheduling them + * reload watches and start scheduling them + * + * @param state the current cluster state + * @param reason the reason for reloading, will be logged + * @param loadTriggeredWatches should triggered watches be loaded in this run, not needed for reloading, only for starting + * @return true if no other loading of a newer cluster state happened in parallel, false otherwise */ - private synchronized void reloadInner(ClusterState state, String reason, boolean loadTriggeredWatches) { + private synchronized boolean reloadInner(ClusterState state, String reason, boolean loadTriggeredWatches) { // exit early if another thread has come in between if (processedClusterStateVersion.get() != state.getVersion()) { logger.debug("watch service has not been reloaded for state [{}], another reload for state [{}] in progress", @@ -221,9 +238,11 @@ private synchronized void reloadInner(ClusterState state, String reason, boolean executionService.executeTriggeredWatches(triggeredWatches); } logger.debug("watch service has been reloaded, reason [{}]", reason); + return true; } else { logger.debug("watch service has not been reloaded for state [{}], another reload for state [{}] in progress", state.getVersion(), processedClusterStateVersion.get()); + return false; } } diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java index 3200d0c90f2f9..520ae2f2306de 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java @@ -121,11 +121,25 @@ public void unPause() { } /** - * Pause the execution of the watcher executor + * Pause the execution of the watcher executor, and empty the state. + * Pausing means, that no new watch executions will be done unless this pausing is explicitely unset. + * This is important when watcher is stopped, so that scheduled watches do not accidentally get executed. + * This should not be used when we need to reload watcher based on some cluster state changes, then just calling + * {@link #clearExecutionsAndQueue()} is the way to go + * * @return the number of tasks that have been removed */ public int pause() { paused.set(true); + return clearExecutionsAndQueue(); + } + + /** + * Empty the currently queued tasks and wait for current executions to finish. + * + * @return the number of tasks that have been removed + */ + public int clearExecutionsAndQueue() { int cancelledTaskCount = executor.queue().drainTo(new ArrayList<>()); this.clearExecutions(); return cancelledTaskCount; diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java index e11697b3e198b..d83cacfacdff6 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java @@ -181,7 +181,7 @@ public void testManualStartStop() { reset(watcherService); when(watcherService.validate(clusterState)).thenReturn(true); lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, stoppedClusterState)); - verify(watcherService, times(1)).start(eq(clusterState)); + verify(watcherService, times(1)).start(eq(clusterState), anyObject()); // no change, keep going reset(watcherService); @@ -457,7 +457,7 @@ public void testWatcherStartsOnlyOnMasterWhenOldNodesAreInCluster() throws Excep // now start again lifeCycleService.clusterChanged(new ClusterChangedEvent("any", state, watcherStoppedState)); - verify(watcherService).start(any(ClusterState.class)); + verify(watcherService).start(any(ClusterState.class), anyObject()); } public void testDistributedWatchExecutionDisabledWith5xNodesInCluster() throws Exception { @@ -509,7 +509,7 @@ public void testWatcherServiceDoesNotStartIfIndexTemplatesAreMissing() throws Ex when(watcherService.validate(eq(state))).thenReturn(true); lifeCycleService.clusterChanged(new ClusterChangedEvent("any", state, state)); - verify(watcherService, times(0)).start(any(ClusterState.class)); + verify(watcherService, times(0)).start(any(ClusterState.class), anyObject()); } public void testWatcherStopsWhenMasterNodeIsMissing() { diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java index 5f815170215d3..73f9271e3efda 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java @@ -68,6 +68,7 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -199,7 +200,7 @@ void stopExecutor() { when(client.clearScroll(any(ClearScrollRequest.class))).thenReturn(clearScrollFuture); clearScrollFuture.onResponse(new ClearScrollResponse(true, 1)); - service.start(clusterState); + service.start(clusterState, () -> {}); ArgumentCaptor captor = ArgumentCaptor.forClass(List.class); verify(triggerService).start(captor.capture()); @@ -238,6 +239,27 @@ void stopExecutor() { verify(triggerEngine).pauseExecution(); } + // if we have to reload the watcher service, the execution service should not be paused, as this might + // result in missing executions + public void testReloadingWatcherDoesNotPauseExecutionService() { + ExecutionService executionService = mock(ExecutionService.class); + TriggerService triggerService = mock(TriggerService.class); + WatcherService service = new WatcherService(Settings.EMPTY, triggerService, mock(TriggeredWatchStore.class), + executionService, mock(WatchParser.class), mock(Client.class), executorService) { + @Override + void stopExecutor() { + } + }; + + ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); + csBuilder.metaData(MetaData.builder()); + + service.reload(csBuilder.build(), "whatever"); + verify(executionService).clearExecutionsAndQueue(); + verify(executionService, never()).pause(); + verify(triggerService).pauseExecution(); + } + private static DiscoveryNode newNode() { return new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Collections.emptyMap(), new HashSet<>(asList(DiscoveryNode.Role.values())), Version.CURRENT); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/ExecutionVarsIntegrationTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/ExecutionVarsIntegrationTests.java index 1b38f46f2eeac..8ab5a0f0fa348 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/ExecutionVarsIntegrationTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/ExecutionVarsIntegrationTests.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.function.Function; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.loggingAction; import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder; import static org.elasticsearch.xpack.watcher.input.InputBuilders.simpleInput; @@ -35,6 +36,8 @@ public class ExecutionVarsIntegrationTests extends AbstractWatcherIntegrationTestCase { + private String watchId = randomAlphaOfLength(20); + @Override protected List> pluginTypes() { List> types = super.pluginTypes(); @@ -106,7 +109,7 @@ protected Map, Object>> pluginScripts() { public void testVars() throws Exception { WatcherClient watcherClient = watcherClient(); - PutWatchResponse putWatchResponse = watcherClient.preparePutWatch("_id").setSource(watchBuilder() + PutWatchResponse putWatchResponse = watcherClient.preparePutWatch(watchId).setSource(watchBuilder() .trigger(schedule(cron("0/1 * * * * ?"))) .input(simpleInput("value", 5)) .condition(new ScriptCondition( @@ -125,7 +128,7 @@ public void testVars() throws Exception { assertThat(putWatchResponse.isCreated(), is(true)); - timeWarp().trigger("_id"); + timeWarp().trigger(watchId); flush(); refresh(); @@ -134,11 +137,11 @@ public void testVars() throws Exception { // defaults to match all; }); - assertThat(searchResponse.getHits().getTotalHits(), is(1L)); + assertHitCount(searchResponse, 1L); Map source = searchResponse.getHits().getAt(0).getSourceAsMap(); - assertValue(source, "watch_id", is("_id")); + assertValue(source, "watch_id", is(watchId)); assertValue(source, "state", is("executed")); // we don't store the computed vars in history @@ -170,7 +173,7 @@ public void testVars() throws Exception { public void testVarsManual() throws Exception { WatcherClient watcherClient = watcherClient(); - PutWatchResponse putWatchResponse = watcherClient.preparePutWatch("_id").setSource(watchBuilder() + PutWatchResponse putWatchResponse = watcherClient.preparePutWatch(watchId).setSource(watchBuilder() .trigger(schedule(cron("0/1 * * * * ? 2020"))) .input(simpleInput("value", 5)) .condition(new ScriptCondition( @@ -192,13 +195,13 @@ public void testVarsManual() throws Exception { boolean debug = randomBoolean(); ExecuteWatchResponse executeWatchResponse = watcherClient - .prepareExecuteWatch("_id") + .prepareExecuteWatch(watchId) .setDebug(debug) .get(); assertThat(executeWatchResponse.getRecordId(), notNullValue()); XContentSource source = executeWatchResponse.getRecordSource(); - assertValue(source, "watch_id", is("_id")); + assertValue(source, "watch_id", is(watchId)); assertValue(source, "state", is("executed")); if (debug) {