diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 24a13dc270b7e..bd1d0e7dafb5d 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -1468,21 +1468,10 @@ public static Set snapshottingIndices(final ClusterState currentState, fi final Set indices = new HashSet<>(); for (final SnapshotsInProgress.Entry entry : snapshots.entries()) { if (entry.partial() == false) { - if (entry.state() == State.INIT) { - for (IndexId index : entry.indices()) { - IndexMetaData indexMetaData = currentState.metaData().index(index.getName()); - if (indexMetaData != null && indicesToCheck.contains(indexMetaData.getIndex())) { - indices.add(indexMetaData.getIndex()); - } - } - } else { - for (ObjectObjectCursor shard : entry.shards()) { - Index index = shard.key.getIndex(); - if (indicesToCheck.contains(index) - && shard.value.state().completed() == false - && currentState.getMetaData().index(index) != null) { - indices.add(index); - } + for (IndexId index : entry.indices()) { + IndexMetaData indexMetaData = currentState.metaData().index(index.getName()); + if (indexMetaData != null && indicesToCheck.contains(indexMetaData.getIndex())) { + indices.add(indexMetaData.getIndex()); } } } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 45c66caa7900f..5e483981b14fe 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -26,6 +26,10 @@ import org.elasticsearch.action.ActionType; import org.elasticsearch.action.RequestValidators; import org.elasticsearch.action.StepListener; +import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryAction; +import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryRequest; +import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryResponse; +import org.elasticsearch.action.admin.cluster.repositories.cleanup.TransportCleanupRepositoryAction; import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryAction; import org.elasticsearch.action.admin.cluster.repositories.put.TransportPutRepositoryAction; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteAction; @@ -75,6 +79,7 @@ import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.AutoCreateIndex; import org.elasticsearch.action.support.DestructiveOperations; +import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; @@ -156,6 +161,7 @@ import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; +import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.repositories.fs.FsRepository; @@ -233,6 +239,15 @@ public void createServices() { @After public void verifyReposThenStopServices() { try { + clearDisruptionsAndAwaitSync(); + + final StepListener cleanupResponse = new StepListener<>(); + client().admin().cluster().cleanupRepository( + new CleanupRepositoryRequest("repo"), cleanupResponse); + final AtomicBoolean cleanedUp = new AtomicBoolean(false); + continueOrDie(cleanupResponse, r -> cleanedUp.set(true)); + + runUntil(cleanedUp::get, TimeUnit.MINUTES.toMillis(1L)); if (blobStoreContext != null) { blobStoreContext.forceConsistent(); } @@ -258,8 +273,8 @@ public void testSuccessfulSnapshotAndRestore() { final StepListener createSnapshotResponseListener = new StepListener<>(); - continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards), createIndexResponse -> { - final Runnable afterIndexing = () -> masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName) + continueOrDie(createRepoAndIndex(repoName, index, shards), createIndexResponse -> { + final Runnable afterIndexing = () -> client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName) .setWaitForCompletion(true).execute(createSnapshotResponseListener); if (documents == 0) { afterIndexing.run(); @@ -269,7 +284,7 @@ public void testSuccessfulSnapshotAndRestore() { bulkRequest.add(new IndexRequest(index).source(Collections.singletonMap("foo", "bar" + i))); } final StepListener bulkResponseStepListener = new StepListener<>(); - masterNode.client.bulk(bulkRequest, bulkResponseStepListener); + client().bulk(bulkRequest, bulkResponseStepListener); continueOrDie(bulkResponseStepListener, bulkResponse -> { assertFalse("Failures in bulk response: " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures()); assertEquals(documents, bulkResponse.getItems().length); @@ -281,16 +296,16 @@ public void testSuccessfulSnapshotAndRestore() { final StepListener deleteIndexListener = new StepListener<>(); continueOrDie(createSnapshotResponseListener, - createSnapshotResponse -> masterNode.client.admin().indices().delete(new DeleteIndexRequest(index), deleteIndexListener)); + createSnapshotResponse -> client().admin().indices().delete(new DeleteIndexRequest(index), deleteIndexListener)); final StepListener restoreSnapshotResponseListener = new StepListener<>(); - continueOrDie(deleteIndexListener, ignored -> masterNode.client.admin().cluster().restoreSnapshot( + continueOrDie(deleteIndexListener, ignored -> client().admin().cluster().restoreSnapshot( new RestoreSnapshotRequest(repoName, snapshotName).waitForCompletion(true), restoreSnapshotResponseListener)); final StepListener searchResponseListener = new StepListener<>(); continueOrDie(restoreSnapshotResponseListener, restoreSnapshotResponse -> { assertEquals(shards, restoreSnapshotResponse.getRestoreInfo().totalShards()); - masterNode.client.search( + client().search( new SearchRequest(index).source(new SearchSourceBuilder().size(0).trackTotalHits(true)), searchResponseListener); }); @@ -307,7 +322,7 @@ public void testSuccessfulSnapshotAndRestore() { SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE); assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false)); final Repository repository = masterNode.repositoriesService.repository(repoName); - Collection snapshotIds = repository.getRepositoryData().getSnapshotIds(); + Collection snapshotIds = getRepositoryData(repository).getSnapshotIds(); assertThat(snapshotIds, hasSize(1)); final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotIds.iterator().next()); @@ -319,33 +334,34 @@ public void testSuccessfulSnapshotAndRestore() { public void testSnapshotWithNodeDisconnects() { final int dataNodes = randomIntBetween(2, 10); - setupTestCluster(randomFrom(1, 3, 5), dataNodes); + final int masterNodes = randomFrom(1, 3, 5); + setupTestCluster(masterNodes, dataNodes); String repoName = "repo"; String snapshotName = "snapshot"; final String index = "test"; final int shards = randomIntBetween(1, 10); - TestClusterNodes.TestClusterNode masterNode = - testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state()); final StepListener createSnapshotResponseStepListener = new StepListener<>(); - continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards), createIndexResponse -> { + continueOrDie(createRepoAndIndex(repoName, index, shards), createIndexResponse -> { for (int i = 0; i < randomIntBetween(0, dataNodes); ++i) { scheduleNow(this::disconnectRandomDataNode); } if (randomBoolean()) { scheduleNow(() -> testClusterNodes.clearNetworkDisruptions()); } - masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName).execute(createSnapshotResponseStepListener); + testClusterNodes.randomMasterNodeSafe().client.admin().cluster() + .prepareCreateSnapshot(repoName, snapshotName).execute(createSnapshotResponseStepListener); }); continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> { for (int i = 0; i < randomIntBetween(0, dataNodes); ++i) { scheduleNow(this::disconnectOrRestartDataNode); } - final boolean disconnectedMaster = randomBoolean(); + // Only disconnect master if we have more than a single master and can simulate a failover + final boolean disconnectedMaster = randomBoolean() && masterNodes > 1; if (disconnectedMaster) { scheduleNow(this::disconnectOrRestartMasterNode); } @@ -368,7 +384,7 @@ public void testSnapshotWithNodeDisconnects() { SnapshotsInProgress finalSnapshotsInProgress = randomMaster.clusterService.state().custom(SnapshotsInProgress.TYPE); assertThat(finalSnapshotsInProgress.entries(), empty()); final Repository repository = randomMaster.repositoriesService.repository(repoName); - Collection snapshotIds = repository.getRepositoryData().getSnapshotIds(); + Collection snapshotIds = getRepositoryData(repository).getSnapshotIds(); assertThat(snapshotIds, hasSize(1)); } @@ -385,18 +401,18 @@ public void testConcurrentSnapshotCreateAndDelete() { final StepListener createSnapshotResponseStepListener = new StepListener<>(); - continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards), - createIndexResponse -> masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName) + continueOrDie(createRepoAndIndex(repoName, index, shards), + createIndexResponse -> client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName) .execute(createSnapshotResponseStepListener)); final StepListener deleteSnapshotStepListener = new StepListener<>(); - continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> masterNode.client.admin().cluster().deleteSnapshot( + continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> client().admin().cluster().deleteSnapshot( new DeleteSnapshotRequest(repoName, snapshotName), deleteSnapshotStepListener)); final StepListener createAnotherSnapshotResponseStepListener = new StepListener<>(); - continueOrDie(deleteSnapshotStepListener, acknowledgedResponse -> masterNode.client.admin().cluster() + continueOrDie(deleteSnapshotStepListener, acknowledgedResponse -> client().admin().cluster() .prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(true).execute(createAnotherSnapshotResponseStepListener)); continueOrDie(createAnotherSnapshotResponseStepListener, createSnapshotResponse -> assertEquals(createSnapshotResponse.getSnapshotInfo().state(), SnapshotState.SUCCESS)); @@ -407,7 +423,7 @@ public void testConcurrentSnapshotCreateAndDelete() { SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE); assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false)); final Repository repository = masterNode.repositoriesService.repository(repoName); - Collection snapshotIds = repository.getRepositoryData().getSnapshotIds(); + Collection snapshotIds = getRepositoryData(repository).getSnapshotIds(); assertThat(snapshotIds, hasSize(1)); final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotIds.iterator().next()); @@ -417,6 +433,50 @@ public void testConcurrentSnapshotCreateAndDelete() { assertEquals(0, snapshotInfo.failedShards()); } + public void testConcurrentSnapshotDeleteAndDeleteIndex() { + setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10)); + + String repoName = "repo"; + String snapshotName = "snapshot"; + final String index = "test"; + + TestClusterNodes.TestClusterNode masterNode = + testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state()); + + final StepListener> createIndicesListener = new StepListener<>(); + + continueOrDie(createRepoAndIndex(repoName, index, 1), createIndexResponse -> { + // create a few more indices to make it more likely that the subsequent index delete operation happens before snapshot + // finalization + final int indices = randomIntBetween(5, 20); + final GroupedActionListener listener = new GroupedActionListener<>(createIndicesListener, indices); + for (int i = 0; i < indices; ++i) { + client().admin().indices().create(new CreateIndexRequest("index-" + i), listener); + } + }); + + final StepListener createSnapshotResponseStepListener = new StepListener<>(); + + continueOrDie(createIndicesListener, createIndexResponses -> + client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(false) + .execute(createSnapshotResponseStepListener)); + + continueOrDie(createSnapshotResponseStepListener, + createSnapshotResponse -> client().admin().indices().delete(new DeleteIndexRequest(index), noopListener())); + + deterministicTaskQueue.runAllRunnableTasks(); + + SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE); + assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false)); + final Repository repository = masterNode.repositoriesService.repository(repoName); + Collection snapshotIds = getRepositoryData(repository).getSnapshotIds(); + assertThat(snapshotIds, hasSize(1)); + + final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotIds.iterator().next()); + assertEquals(SnapshotState.SUCCESS, snapshotInfo.state()); + assertEquals(0, snapshotInfo.failedShards()); + } + /** * Simulates concurrent restarts of data and master nodes as well as relocating a primary shard, while starting and subsequently * deleting a snapshot. @@ -438,8 +498,8 @@ public void testSnapshotPrimaryRelocations() { final StepListener clusterStateResponseStepListener = new StepListener<>(); - continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards), - createIndexResponse -> masterAdminClient.cluster().state(new ClusterStateRequest(), clusterStateResponseStepListener)); + continueOrDie(createRepoAndIndex(repoName, index, shards), + createIndexResponse -> client().admin().cluster().state(new ClusterStateRequest(), clusterStateResponseStepListener)); continueOrDie(clusterStateResponseStepListener, clusterStateResponse -> { final ShardRouting shardToRelocate = clusterStateResponse.getState().routingTable().allShards(index).get(0); @@ -490,8 +550,8 @@ public void run() { final SnapshotsInProgress finalSnapshotsInProgress = testClusterNodes.randomDataNodeSafe() .clusterService.state().custom(SnapshotsInProgress.TYPE); assertThat(finalSnapshotsInProgress.entries(), empty()); - final Repository repository = masterNode.repositoriesService.repository(repoName); - Collection snapshotIds = repository.getRepositoryData().getSnapshotIds(); + final Repository repository = testClusterNodes.randomMasterNodeSafe().repositoriesService.repository(repoName); + Collection snapshotIds = getRepositoryData(repository).getSnapshotIds(); assertThat(snapshotIds, either(hasSize(1)).or(hasSize(0))); } @@ -509,19 +569,18 @@ public void testSuccessfulSnapshotWithConcurrentDynamicMappingUpdates() { final StepListener createSnapshotResponseStepListener = new StepListener<>(); - continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards), createIndexResponse -> { + continueOrDie(createRepoAndIndex(repoName, index, shards), createIndexResponse -> { final AtomicBoolean initiatedSnapshot = new AtomicBoolean(false); for (int i = 0; i < documents; ++i) { // Index a few documents with different field names so we trigger a dynamic mapping update for each of them - masterNode.client.bulk( - new BulkRequest().add(new IndexRequest(index).source(Collections.singletonMap("foo" + i, "bar"))) + client().bulk(new BulkRequest().add(new IndexRequest(index).source(Collections.singletonMap("foo" + i, "bar"))) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), assertNoFailureListener( bulkResponse -> { assertFalse("Failures in bulkresponse: " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures()); if (initiatedSnapshot.compareAndSet(false, true)) { - masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName) - .setWaitForCompletion(true).execute(createSnapshotResponseStepListener); + client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(true) + .execute(createSnapshotResponseStepListener); } })); } @@ -531,7 +590,7 @@ public void testSuccessfulSnapshotWithConcurrentDynamicMappingUpdates() { final StepListener restoreSnapshotResponseStepListener = new StepListener<>(); - continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> masterNode.client.admin().cluster().restoreSnapshot( + continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> client().admin().cluster().restoreSnapshot( new RestoreSnapshotRequest(repoName, snapshotName) .renamePattern(index).renameReplacement(restoredIndex).waitForCompletion(true), restoreSnapshotResponseStepListener)); @@ -539,8 +598,7 @@ public void testSuccessfulSnapshotWithConcurrentDynamicMappingUpdates() { continueOrDie(restoreSnapshotResponseStepListener, restoreSnapshotResponse -> { assertEquals(shards, restoreSnapshotResponse.getRestoreInfo().totalShards()); - masterNode.client.search( - new SearchRequest(restoredIndex).source(new SearchSourceBuilder().size(documents).trackTotalHits(true)), + client().search(new SearchRequest(restoredIndex).source(new SearchSourceBuilder().size(documents).trackTotalHits(true)), searchResponseStepListener); }); @@ -564,7 +622,7 @@ public void testSuccessfulSnapshotWithConcurrentDynamicMappingUpdates() { SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE); assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false)); final Repository repository = masterNode.repositoriesService.repository(repoName); - Collection snapshotIds = repository.getRepositoryData().getSnapshotIds(); + Collection snapshotIds = getRepositoryData(repository).getSnapshotIds(); assertThat(snapshotIds, hasSize(1)); final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotIds.iterator().next()); @@ -574,18 +632,19 @@ public void testSuccessfulSnapshotWithConcurrentDynamicMappingUpdates() { assertEquals(0, snapshotInfo.failedShards()); } - private StepListener createRepoAndIndex(TestClusterNodes.TestClusterNode masterNode, String repoName, - String index, int shards) { - final AdminClient adminClient = masterNode.client.admin(); + private RepositoryData getRepositoryData(Repository repository) { + return repository.getRepositoryData(); + } + private StepListener createRepoAndIndex(String repoName, String index, int shards) { final StepListener createRepositoryListener = new StepListener<>(); - adminClient.cluster().preparePutRepository(repoName).setType(FsRepository.TYPE) + client().admin().cluster().preparePutRepository(repoName).setType(FsRepository.TYPE) .setSettings(Settings.builder().put("location", randomAlphaOfLength(10))).execute(createRepositoryListener); final StepListener createIndexResponseStepListener = new StepListener<>(); - continueOrDie(createRepositoryListener, acknowledgedResponse -> adminClient.indices().create( + continueOrDie(createRepositoryListener, acknowledgedResponse -> client().admin().indices().create( new CreateIndexRequest(index).waitForActiveShards(ActiveShardCount.ALL).settings(defaultIndexSettings(shards)), createIndexResponseStepListener)); @@ -594,11 +653,7 @@ private StepListener createRepoAndIndex(TestClusterNodes.Te private void clearDisruptionsAndAwaitSync() { testClusterNodes.clearNetworkDisruptions(); - runUntil(() -> { - final List versions = testClusterNodes.nodes.values().stream() - .map(n -> n.clusterService.state().version()).distinct().collect(Collectors.toList()); - return versions.size() == 1L; - }, TimeUnit.MINUTES.toMillis(1L)); + stabilize(); } private void disconnectOrRestartDataNode() { @@ -635,15 +690,25 @@ private void startCluster() { .filter(DiscoveryNode::isMasterNode).map(DiscoveryNode::getId).collect(Collectors.toSet())); testClusterNodes.nodes.values().stream().filter(n -> n.node.isMasterNode()).forEach( testClusterNode -> testClusterNode.coordinator.setInitialConfiguration(votingConfiguration)); + // Connect all nodes to each other + testClusterNodes.nodes.values().forEach(node -> testClusterNodes.nodes.values().forEach( + n -> n.transportService.connectToNode(node.node, null, + ActionTestUtils.assertNoFailureListener(c -> logger.info("--> Connected [{}] to [{}]", n.node, node.node))))); + stabilize(); + } + private void stabilize() { runUntil( () -> { - List masterNodeIds = testClusterNodes.nodes.values().stream() - .map(node -> node.clusterService.state().nodes().getMasterNodeId()) - .distinct().collect(Collectors.toList()); - return masterNodeIds.size() == 1 && masterNodeIds.contains(null) == false; + final Collection clusterStates = + testClusterNodes.nodes.values().stream().map(node -> node.clusterService.state()).collect(Collectors.toList()); + final Set masterNodeIds = clusterStates.stream() + .map(clusterState -> clusterState.nodes().getMasterNodeId()).collect(Collectors.toSet()); + final Set terms = clusterStates.stream().map(ClusterState::term).collect(Collectors.toSet()); + final List versions = clusterStates.stream().map(ClusterState::version).distinct().collect(Collectors.toList()); + return versions.size() == 1 && masterNodeIds.size() == 1 && masterNodeIds.contains(null) == false && terms.size() == 1; }, - TimeUnit.SECONDS.toMillis(30L) + TimeUnit.MINUTES.toMillis(1L) ); } @@ -689,6 +754,16 @@ private static ActionListener noopListener() { return ActionListener.wrap(() -> {}); } + public NodeClient client() { + // Select from sorted list of nodes + final List nodes = testClusterNodes.nodes.values().stream() + .filter(n -> testClusterNodes.disconnectedNodes.contains(n.node.getName()) == false) + .sorted(Comparator.comparing(n -> n.node.getName())).collect(Collectors.toList()); + if (nodes.isEmpty()) { + throw new AssertionError("No nodes available"); + } + return randomFrom(nodes).client; + } /** * Create a {@link Environment} with random path.home and path.repo **/ @@ -765,6 +840,7 @@ public TestClusterNode randomMasterNodeSafe() { public Optional randomMasterNode() { // Select from sorted list of data-nodes here to not have deterministic behaviour final List masterNodes = testClusterNodes.nodes.values().stream().filter(n -> n.node.isMasterNode()) + .filter(n -> disconnectedNodes.contains(n.node.getName()) == false) .sorted(Comparator.comparing(n -> n.node.getName())).collect(Collectors.toList()); return masterNodes.isEmpty() ? Optional.empty() : Optional.of(randomFrom(masterNodes)); } @@ -1108,6 +1184,8 @@ searchTransportService, new SearchPhaseController(searchService::createReduceCon transportService, clusterService, repositoriesService, threadPool, actionFilters, indexNameExpressionResolver )); + actions.put(CleanupRepositoryAction.INSTANCE, new TransportCleanupRepositoryAction(transportService, clusterService, + repositoriesService, threadPool, actionFilters, indexNameExpressionResolver)); actions.put(CreateSnapshotAction.INSTANCE, new TransportCreateSnapshotAction( transportService, clusterService, threadPool,